tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [5/7] tajo git commit: TAJO-2129: Apply new type implementation to Schema and Catalog.
Date Fri, 13 May 2016 05:32:45 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/schema/Field.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/schema/Field.java b/tajo-common/src/main/java/org/apache/tajo/schema/Field.java
new file mode 100644
index 0000000..90193ac
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/schema/Field.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.schema;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.type.Type;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Represent a field in a schema.
+ */
+public class Field implements Cloneable {
+  protected final QualifiedIdentifier name;
+  protected final Type type;
+
+  public Field(QualifiedIdentifier name, Type type) {
+    this.type = type;
+    this.name = name;
+  }
+
+  public static Field Record(QualifiedIdentifier name, Field ... fields) {
+    return Record(name, Arrays.asList(fields));
+  }
+
+  public static Field Record(QualifiedIdentifier name, Collection<Field> fields) {
+    return new Field(name, Type.Record(fields));
+  }
+
+  @VisibleForTesting
+  public static Field Field(String name, Type type) {
+    return new Field(QualifiedIdentifier.$(name), type);
+  }
+
+  public static Field Field(QualifiedIdentifier name, Type type) {
+    return new Field(name, type);
+  }
+
+  public QualifiedIdentifier name() {
+    return this.name;
+  }
+
+  public TajoDataTypes.Type baseType() {
+    return this.type.kind();
+  }
+
+  public <T extends Type> T type() {
+    return (T) type;
+  }
+
+  public boolean isStruct() {
+    return type.isStruct();
+  }
+
+  public boolean isNull() {
+    return type.isNull();
+  }
+
+  @Override
+  public String toString() {
+    return name + " (" + type + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(type, name);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (obj instanceof Field) {
+      Field other = (Field) obj;
+      return type.equals(other.type) && name.equals(other.name);
+    }
+
+    return false;
+  }
+
+  @Override
+  public Field clone() throws CloneNotSupportedException {
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/schema/Identifier.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/schema/Identifier.java b/tajo-common/src/main/java/org/apache/tajo/schema/Identifier.java
index d71dbdc..23ea0fb 100644
--- a/tajo-common/src/main/java/org/apache/tajo/schema/Identifier.java
+++ b/tajo-common/src/main/java/org/apache/tajo/schema/Identifier.java
@@ -18,17 +18,26 @@
 
 package org.apache.tajo.schema;
 
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.common.TajoDataTypes.IdentifierProto;
 import org.apache.tajo.schema.IdentifierPolicy.IdentifierCase;
 
 import java.util.Objects;
 
+import static org.apache.tajo.schema.IdentifierPolicy.DefaultPolicy;
+
 /**
- * Identifier Element
+ * Represents an identifier part
  */
-public class Identifier {
+public class Identifier implements ProtoObject<IdentifierProto> {
   private String name;
   private boolean quoted;
 
+  /**
+   * Identifier constructor
+   * @param name Identifier part string
+   * @param quoted quoted or not
+   */
   private Identifier(String name, boolean quoted) {
     this.name = name;
     this.quoted = quoted;
@@ -42,7 +51,12 @@ public class Identifier {
     return new Identifier(name, quoted);
   }
 
-  public String displayString(IdentifierPolicy policy) {
+  /**
+   * Raw string for an identifier, which is equivalent to an identifier directly used in SQL statements.
+   * @param policy IdentifierPolicy
+   * @return Raw string
+   */
+  public String raw(IdentifierPolicy policy) {
     StringBuilder sb = new StringBuilder();
     if (quoted) {
       appendByCase(sb, policy.storesQuotedIdentifierAs());
@@ -56,11 +70,11 @@ public class Identifier {
   }
 
   /**
-   * Raw string of an identifier
-   * @param policy Identifier Policy
-   * @return raw string
+   * Interned string of an identifier
+   * @param policy IdentifierPolicy
+   * @return interned string
    */
-  public String raw(IdentifierPolicy policy) {
+  public String interned(IdentifierPolicy policy) {
     StringBuilder sb = new StringBuilder();
     if (quoted) {
       appendByCase(sb, policy.storesQuotedIdentifierAs());
@@ -87,7 +101,7 @@ public class Identifier {
 
   @Override
   public String toString() {
-    return displayString(IdentifierPolicy.DefaultPolicy());
+    return raw(DefaultPolicy());
   }
 
   public int hashCode() {
@@ -101,9 +115,21 @@ public class Identifier {
 
     if (obj instanceof Identifier) {
       Identifier other = (Identifier) obj;
-      return other.name == other.name && quoted == other.quoted;
+      return name.equals(other.name) && quoted == other.quoted;
     }
 
     return false;
   }
+
+  @Override
+  public IdentifierProto getProto() {
+    return IdentifierProto.newBuilder()
+        .setName(name)
+        .setQuoted(quoted)
+        .build();
+  }
+
+  public static Identifier fromProto(IdentifierProto proto) {
+    return new Identifier(proto.getName(), proto.getQuoted());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/schema/IdentifierPolicy.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/schema/IdentifierPolicy.java b/tajo-common/src/main/java/org/apache/tajo/schema/IdentifierPolicy.java
index 56762e7..75da186 100644
--- a/tajo-common/src/main/java/org/apache/tajo/schema/IdentifierPolicy.java
+++ b/tajo-common/src/main/java/org/apache/tajo/schema/IdentifierPolicy.java
@@ -24,9 +24,9 @@ package org.apache.tajo.schema;
 public abstract class IdentifierPolicy {
 
   /** Quote String; e.g., 'abc' */
-  public static final String ANSI_SQL_QUOTE_STRING = "'";
+  public static final char ANSI_SQL_QUOTE_STRING = '\'';
   /** Separator; e.g., abc.xyz */
-  public static final String ANSI_SQL_SEPERATOR_STRING = ".";
+  public static final char ANSI_SQL_SEPERATOR_STRING = '.';
   /** Maximum length of identifiers */
   public final static int MAX_IDENTIFIER_LENGTH = 128;
 
@@ -49,7 +49,7 @@ public abstract class IdentifierPolicy {
    *
    * @return the quoting string or a space if quoting is not supported
    */
-  abstract String getIdentifierQuoteString();
+  abstract char getIdentifierQuoteString();
 
   /**
    * Retrieves the <code>String</code> that this policy uses as the separator between
@@ -57,7 +57,7 @@ public abstract class IdentifierPolicy {
    *
    * @return the separator string
    */
-  abstract String getIdentifierSeperator();
+  abstract char getIdentifierSeperator();
 
   /**
    * Retrieves the maximum number of characters this policy allows for a column name.

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/schema/IdentifierUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/schema/IdentifierUtil.java b/tajo-common/src/main/java/org/apache/tajo/schema/IdentifierUtil.java
new file mode 100644
index 0000000..f086410
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/schema/IdentifierUtil.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.schema;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.schema.IdentifierPolicy.IdentifierCase;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.StringUtils;
+
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+
+import static org.apache.tajo.schema.Identifier._;
+import static org.apache.tajo.schema.IdentifierPolicy.DefaultPolicy;
+import static org.apache.tajo.schema.QualifiedIdentifier.$;
+
+/**
+ * Util methods for Identifiers
+ */
+public class IdentifierUtil {
+
+  public final static String IDENTIFIER_DELIMITER_REGEXP = "\\.";
+  public final static String IDENTIFIER_DELIMITER = ".";
+  public final static String IDENTIFIER_QUOTE_STRING = "\"";
+  public final static int MAX_IDENTIFIER_LENGTH = 128;
+
+  public static final Set<String> RESERVED_KEYWORDS_SET = new HashSet<>();
+  public static final String [] RESERVED_KEYWORDS = {
+      "AS", "ALL", "AND", "ANY", "ASYMMETRIC", "ASC",
+      "BOTH",
+      "CASE", "CAST", "CREATE", "CROSS", "CURRENT_DATE", "CURRENT_TIME", "CURRENT_TIMESTAMP",
+      "DESC", "DISTINCT",
+      "END", "ELSE", "EXCEPT",
+      "FALSE", "FULL", "FROM",
+      "GROUP",
+      "HAVING",
+      "ILIKE", "IN", "INNER", "INTERSECT", "INTO", "IS",
+      "JOIN",
+      "LEADING", "LEFT", "LIKE", "LIMIT",
+      "NATURAL", "NOT", "NULL",
+      "ON", "OUTER", "OR", "ORDER",
+      "RIGHT",
+      "SELECT", "SOME", "SYMMETRIC",
+      "TABLE", "THEN", "TRAILING", "TRUE",
+      "OVER",
+      "UNION", "UNIQUE", "USING",
+      "WHEN", "WHERE", "WINDOW", "WITH"
+  };
+
+  public static QualifiedIdentifier makeIdentifier(String raw, IdentifierPolicy policy) {
+    if (raw == null || raw.equals("")) {
+      return $(raw);
+    }
+
+    final String [] splitted = raw.split(IDENTIFIER_DELIMITER_REGEXP);
+    final ImmutableList.Builder<Identifier> builder = ImmutableList.builder();
+
+    for (String part : splitted) {
+      boolean quoted = isDelimited(part, policy);
+      if (quoted) {
+        builder.add(_(stripQuote(part), true));
+      } else {
+        builder.add(internIdentifierPart(part, policy));
+      }
+    }
+    return $(builder.build());
+  }
+
+  public static Identifier internIdentifierPart(String rawPart, IdentifierPolicy policy) {
+    IdentifierCase unquotedIdentifierCase = policy.storesUnquotedIdentifierAs();
+    final String interned;
+    if (unquotedIdentifierCase == IdentifierCase.LowerCase) {
+      interned = rawPart.toLowerCase(Locale.ENGLISH);
+    } else if (unquotedIdentifierCase == IdentifierCase.UpperCase) {
+      interned = rawPart.toUpperCase(Locale.ENGLISH);
+    } else {
+      interned = rawPart;
+    }
+
+    return _(interned, false);
+  }
+
+  /**
+   * Normalize an identifier. Normalization means a translation from a identifier to be a refined identifier name.
+   *
+   * Identifier can be composed of multiple parts as follows:
+   * <pre>
+   *   database_name.table_name.column_name
+   * </pre>
+   *
+   * Each regular identifier part can be composed alphabet ([a-z][A-Z]), number([0-9]), and underscore([_]).
+   * Also, the first letter must be an alphabet character.
+   *
+   * <code>normalizeIdentifier</code> normalizes each part of an identifier.
+   *
+   * In detail, for each part, it performs as follows:
+   * <ul>
+   *   <li>changing a part without double quotation to be lower case letters</li>
+   *   <li>eliminating double quotation marks from identifier</li>
+   * </ul>
+   *
+   * @param identifier The identifier to be normalized
+   * @return The normalized identifier
+   */
+  public static String normalizeIdentifier(String identifier) {
+    if (identifier == null || identifier.equals("")) {
+      return identifier;
+    }
+    String [] splitted = identifier.split(IDENTIFIER_DELIMITER_REGEXP);
+
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (String part : splitted) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(IDENTIFIER_DELIMITER);
+      }
+      sb.append(normalizeIdentifierPart(part));
+    }
+    return sb.toString();
+  }
+
+  public static String normalizeIdentifierPart(String part) {
+    return isDelimited(part) ? stripQuote(part) : part.toLowerCase();
+  }
+
+  /**
+   * Denormalize an identifier. Denormalize means a translation from a stored identifier
+   * to be a printable identifier name.
+   *
+   * In detail, for each part, it performs as follows:
+   * <ul>
+   *   <li>changing a part including upper case character or non-ascii character to be lower case letters</li>
+   *   <li>eliminating double quotation marks from identifier</li>
+   * </ul>
+   *
+   * @param identifier The identifier to be normalized
+   * @return The denormalized identifier
+   */
+  public static String denormalizeIdentifier(String identifier) {
+    String [] splitted = identifier.split(IDENTIFIER_DELIMITER_REGEXP);
+
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (String part : splitted) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(IDENTIFIER_DELIMITER);
+      }
+      sb.append(denormalizePart(part));
+    }
+    return sb.toString();
+  }
+
+  public static String denormalizePart(String identifier) {
+    if (isShouldBeQuoted(identifier)) {
+      return StringUtils.doubleQuote(identifier);
+    } else {
+      return identifier;
+    }
+  }
+
+  public static boolean isShouldBeQuoted(String interned) {
+    for (char character : interned.toCharArray()) {
+      if (Character.isUpperCase(character)) {
+        return true;
+      }
+
+      if (!StringUtils.isPartOfAnsiSQLIdentifier(character)) {
+        return true;
+      }
+
+      if (RESERVED_KEYWORDS_SET.contains(interned.toUpperCase())) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  public static String stripQuote(String raw) {
+    return raw.substring(1, raw.length() - 1);
+  }
+
+  public static boolean isDelimited(String raw) {
+    return isDelimited(raw, DefaultPolicy());
+  }
+
+  public static boolean isDelimited(String raw, IdentifierPolicy policy) {
+    char quoteString = policy.getIdentifierQuoteString();
+    boolean openQuote = raw.charAt(0) == quoteString;
+    boolean closeQuote = raw.charAt(raw.length() - 1) == quoteString;
+
+    // if at least one quote mark exists, the identifier must be grater than equal to 2 characters,
+    if (openQuote ^ closeQuote && raw.length() < 2) {
+      throw new IllegalArgumentException("Invalid Identifier: " + raw);
+    }
+
+    // does not allow the empty identifier (''),
+    if (openQuote && closeQuote && raw.length() == 2) {
+      throw new IllegalArgumentException("zero-length delimited identifier: " + raw);
+    }
+
+    // Ensure the quote open and close
+    return openQuote && closeQuote;
+  }
+
+  /**
+   * True if a given name is a simple identifier, meaning is not a dot-chained name.
+   *
+   * @param columnOrTableName Column or Table name to be checked
+   * @return True if a given name is a simple identifier. Otherwise, it will return False.
+   */
+  public static boolean isSimpleIdentifier(String columnOrTableName) {
+    return columnOrTableName.split(IDENTIFIER_DELIMITER_REGEXP).length == 1;
+  }
+
+  public static boolean isFQColumnName(String tableName) {
+    return tableName.split(IDENTIFIER_DELIMITER_REGEXP).length == 3;
+  }
+
+  public static boolean isFQTableName(String tableName) {
+    int lastDelimiterIdx = tableName.lastIndexOf(IDENTIFIER_DELIMITER);
+    return lastDelimiterIdx > -1;
+  }
+
+  public static String [] splitFQTableName(String qualifiedName) {
+    String [] splitted = splitTableName(qualifiedName);
+    if (splitted.length == 1) {
+      throw new IllegalArgumentException("Table name is expected to be qualified, but was \""
+          + qualifiedName + "\".");
+    }
+    return splitted;
+  }
+
+  public static String [] splitTableName(String tableName) {
+    int lastDelimiterIdx = tableName.lastIndexOf(IDENTIFIER_DELIMITER);
+    if (lastDelimiterIdx > -1) {
+      return new String [] {
+          tableName.substring(0, lastDelimiterIdx),
+          tableName.substring(lastDelimiterIdx + 1, tableName.length())
+      };
+    } else {
+      return new String [] {tableName};
+    }
+  }
+
+  public static String buildFQName(String... identifiers) {
+    boolean first = true;
+    StringBuilder sb = new StringBuilder();
+    for(String id : identifiers) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(IDENTIFIER_DELIMITER);
+      }
+
+      sb.append(id);
+    }
+
+    return sb.toString();
+  }
+
+  public static Pair<String, String> separateQualifierAndName(String name) {
+    Preconditions.checkArgument(isFQTableName(name), "Must be a qualified name.");
+    return new Pair<>(extractQualifier(name), extractSimpleName(name));
+  }
+
+  /**
+   * Extract a qualification name from an identifier.
+   *
+   * For example, consider a table identifier like 'database1.table1'.
+   * In this case, this method extracts 'database1'.
+   *
+   * @param name The identifier to be extracted
+   * @return The extracted qualifier
+   */
+  public static String extractQualifier(String name) {
+    int lastDelimiterIdx = name.lastIndexOf(IDENTIFIER_DELIMITER);
+    if (lastDelimiterIdx > -1) {
+      return name.substring(0, lastDelimiterIdx);
+    } else {
+      return TajoConstants.EMPTY_STRING;
+    }
+  }
+
+  /**
+   * Extract a simple name from an identifier.
+   *
+   * For example, consider a table identifier like 'database1.table1'.
+   * In this case, this method extracts 'table1'.
+   *
+   * @param name The identifier to be extracted
+   * @return The extracted simple name
+   */
+  public static String extractSimpleName(String name) {
+    int lastDelimiterIdx = name.lastIndexOf(IDENTIFIER_DELIMITER);
+    if (lastDelimiterIdx > -1) {
+      // plus one means skipping a delimiter.
+      return name.substring(lastDelimiterIdx + 1, name.length());
+    } else {
+      return name;
+    }
+  }
+
+  public static String getCanonicalTableName(String databaseName, String tableName) {
+    StringBuilder sb = new StringBuilder(databaseName);
+    sb.append(IDENTIFIER_DELIMITER);
+    sb.append(tableName);
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/schema/PgSQLIdentifierPolicy.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/schema/PgSQLIdentifierPolicy.java b/tajo-common/src/main/java/org/apache/tajo/schema/PgSQLIdentifierPolicy.java
index 0aca2d7..31f6d08 100644
--- a/tajo-common/src/main/java/org/apache/tajo/schema/PgSQLIdentifierPolicy.java
+++ b/tajo-common/src/main/java/org/apache/tajo/schema/PgSQLIdentifierPolicy.java
@@ -29,12 +29,12 @@ public class PgSQLIdentifierPolicy extends IdentifierPolicy {
   }
 
   @Override
-  public String getIdentifierQuoteString() {
+  public char getIdentifierQuoteString() {
     return ANSI_SQL_QUOTE_STRING;
   }
 
   @Override
-  String getIdentifierSeperator() {
+  char getIdentifierSeperator() {
     return ANSI_SQL_SEPERATOR_STRING;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/schema/QualifiedIdentifier.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/schema/QualifiedIdentifier.java b/tajo-common/src/main/java/org/apache/tajo/schema/QualifiedIdentifier.java
index 57317b5..761958e 100644
--- a/tajo-common/src/main/java/org/apache/tajo/schema/QualifiedIdentifier.java
+++ b/tajo-common/src/main/java/org/apache/tajo/schema/QualifiedIdentifier.java
@@ -22,51 +22,70 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.common.TajoDataTypes.IdentifierProto;
+import org.apache.tajo.common.TajoDataTypes.QualifiedIdentifierProto;
 import org.apache.tajo.util.StringUtils;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
+import java.util.Objects;
 
 import static org.apache.tajo.schema.IdentifierPolicy.DefaultPolicy;
 
-public class QualifiedIdentifier {
+/**
+ * Represents a qualified identifier. In other words, it represents identifiers for
+ * all kinds of of database objects, such as databases, tables, and columns.
+ */
+public class QualifiedIdentifier implements ProtoObject<QualifiedIdentifierProto> {
   private ImmutableList<Identifier> names;
 
-  private QualifiedIdentifier(ImmutableList<Identifier> names) {
-    this.names = names;
+  private QualifiedIdentifier(Collection<Identifier> names) {
+    this.names = ImmutableList.copyOf(names);
   }
 
-  public String displayString(final IdentifierPolicy policy) {
-    return StringUtils.join(names, policy.getIdentifierSeperator(), new Function<Identifier, String>() {
+  /**
+   * Return a raw string representation for an identifier, which is equivalent to ones used in SQL statements.
+   *
+   * @param policy IdentifierPolicy
+   * @return Raw identifier string
+   */
+  public String raw(final IdentifierPolicy policy) {
+    return StringUtils.join(names, "" + policy.getIdentifierSeperator(), new Function<Identifier, String>() {
       @Override
       public String apply(@Nullable Identifier identifier) {
-        return identifier.displayString(policy);
+        return identifier.raw(policy);
       }
     });
   }
 
+  public String interned() {
+    return interned(DefaultPolicy());
+  }
+
   /**
    * Raw string of qualified identifier
    * @param policy Identifier Policy
    * @return raw string
    */
-  public String raw(final IdentifierPolicy policy) {
-    return StringUtils.join(names, policy.getIdentifierSeperator(), new Function<Identifier, String>() {
+  public String interned(final IdentifierPolicy policy) {
+    return StringUtils.join(names, "" + policy.getIdentifierSeperator(), new Function<Identifier, String>() {
       @Override
       public String apply(@Nullable Identifier identifier) {
-        return identifier.raw(policy);
+        return identifier.interned(policy);
       }
     });
   }
 
   @Override
   public String toString() {
-    return displayString(DefaultPolicy());
+    return interned(DefaultPolicy());
   }
 
   @Override
   public int hashCode() {
-    return names.hashCode();
+    return Objects.hash(names);
   }
 
   public boolean equals(Object obj) {
@@ -89,12 +108,41 @@ public class QualifiedIdentifier {
     return new QualifiedIdentifier(ImmutableList.copyOf(names));
   }
 
+  public static QualifiedIdentifier fromProto(QualifiedIdentifierProto proto) {
+    Collection<Identifier> ids = Collections2.transform(proto.getNamesList(), new Function<IdentifierProto, Identifier>() {
+      @Override
+      public Identifier apply(@Nullable IdentifierProto identifierProto) {
+        return Identifier.fromProto(identifierProto);
+      }
+    });
+    return new QualifiedIdentifier(ids);
+  }
+
+  /**
+   * It takes interned strings. It assumes all parameters already stripped. It is used only for tests.
+   * @param names interned strings
+   * @return QualifiedIdentifier
+   */
   @VisibleForTesting
   public static QualifiedIdentifier $(String...names) {
-    ImmutableList.Builder<Identifier> builder = new ImmutableList.Builder();
-    for (String n :names) {
-      builder.add(Identifier._(n));
+    final ImmutableList.Builder<Identifier> builder = ImmutableList.builder();
+    for (String n : names) {
+      for (String split : n.split(StringUtils.escapeRegexp(""+DefaultPolicy().getIdentifierSeperator()))) {
+        builder.add(Identifier._(split, IdentifierUtil.isShouldBeQuoted(split)));
+      }
     }
     return new QualifiedIdentifier(builder.build());
   }
+
+  @Override
+  public QualifiedIdentifierProto getProto() {
+    return QualifiedIdentifierProto.newBuilder()
+        .addAllNames(Iterables.transform(names, new Function<Identifier, IdentifierProto>() {
+          @Override
+          public IdentifierProto apply(@Nullable Identifier identifier) {
+            return identifier.getProto();
+          }
+        }))
+        .build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/schema/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/schema/Schema.java b/tajo-common/src/main/java/org/apache/tajo/schema/Schema.java
index dfd0b72..771e03b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/schema/Schema.java
+++ b/tajo-common/src/main/java/org/apache/tajo/schema/Schema.java
@@ -18,162 +18,38 @@
 
 package org.apache.tajo.schema;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import org.apache.tajo.type.Type;
 import org.apache.tajo.util.StringUtils;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.Objects;
-
-import static org.apache.tajo.common.TajoDataTypes.Type.RECORD;
 
 /**
  * A field is a pair of a name and a type. Schema is an ordered list of fields.
  */
-public class Schema implements Iterable<Schema.NamedType> {
-  private final ImmutableList<NamedType> namedTypes;
+public class Schema implements Iterable<Field> {
+  private final ImmutableList<Field> fields;
 
-  public Schema(Collection<NamedType> namedTypes) {
-    this.namedTypes = ImmutableList.copyOf(namedTypes);
+  public Schema(Collection<Field> fields) {
+    this.fields = ImmutableList.copyOf(fields);
   }
 
-  public static Schema Schema(NamedType...fields) {
+  public static Schema Schema(Field...fields) {
     return new Schema(Arrays.asList(fields));
   }
 
-  public static Schema Schema(Collection<NamedType> fields) {
+  public static Schema Schema(Collection<Field> fields) {
     return new Schema(fields);
   }
 
   @Override
   public String toString() {
-    return StringUtils.join(namedTypes, ",");
-  }
-
-  public static NamedStructType Struct(QualifiedIdentifier name, NamedType... namedTypes) {
-    return Struct(name, Arrays.asList(namedTypes));
-  }
-
-  public static NamedStructType Struct(QualifiedIdentifier name, Collection<NamedType> namedTypes) {
-    return new NamedStructType(name, namedTypes);
-  }
-
-  public static NamedPrimitiveType Field(QualifiedIdentifier name, Type type) {
-    return new NamedPrimitiveType(name, type);
+    return StringUtils.join(fields, ", ");
   }
 
   @Override
-  public Iterator<NamedType> iterator() {
-    return namedTypes.iterator();
-  }
-
-  public static abstract class NamedType {
-    protected final QualifiedIdentifier name;
-
-    public NamedType(QualifiedIdentifier name) {
-      this.name = name;
-    }
-
-    public QualifiedIdentifier name() {
-      return this.name;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(name);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-
-      if (this == obj) {
-        return true;
-      }
-
-      if (obj instanceof NamedType) {
-        NamedType other = (NamedType) obj;
-        return this.name.equals(other.name);
-      }
-
-      return false;
-    }
-  }
-
-  public static class NamedPrimitiveType extends NamedType {
-    private final Type type;
-
-    public NamedPrimitiveType(QualifiedIdentifier name, Type type) {
-      super(name);
-      Preconditions.checkArgument(type.baseType() != RECORD);
-      this.type = type;
-    }
-
-    public Type type() {
-      return type;
-    }
-
-    @Override
-    public String toString() {
-      return name + " " + type;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(name, type);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (obj instanceof NamedPrimitiveType) {
-        NamedPrimitiveType other = (NamedPrimitiveType) obj;
-        return super.equals(other) && this.type.equals(type);
-      }
-
-      return false;
-    }
-  }
-
-  public static class NamedStructType extends NamedType {
-    private final ImmutableList<NamedType> fields;
-
-    public NamedStructType(QualifiedIdentifier name, Collection<NamedType> fields) {
-      super(name);
-      this.fields = ImmutableList.copyOf(fields);
-    }
-
-    public Collection<NamedType> fields() {
-      return this.fields;
-    }
-
-    @Override
-    public String toString() {
-      return name + " record (" + StringUtils.join(fields, ",") + ")";
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(name, fields);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (obj instanceof NamedStructType) {
-        NamedStructType other = (NamedStructType) obj;
-        return super.equals(other) && fields.equals(other.fields);
-      }
-
-      return false;
-    }
+  public Iterator<Field> iterator() {
+    return fields.iterator();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/schema/package-info.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/schema/package-info.java b/tajo-common/src/main/java/org/apache/tajo/schema/package-info.java
new file mode 100644
index 0000000..88ce440
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/schema/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provide Schema and its relevant classes and utilities.
+ */
+package org.apache.tajo.schema;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Any.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Any.java b/tajo-common/src/main/java/org/apache/tajo/type/Any.java
index d61afae..342fdb6 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Any.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Any.java
@@ -18,16 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
-
-import java.util.Objects;
+import static org.apache.tajo.common.TajoDataTypes.Type.ANY;
 
 public class Any extends Type {
-
-  public static Any INSTANCE = new Any();
-
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.ANY;
+  public Any() {
+    super(ANY);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Array.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Array.java b/tajo-common/src/main/java/org/apache/tajo/type/Array.java
index 9a595fd..3432238 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Array.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Array.java
@@ -18,47 +18,17 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import com.google.common.collect.ImmutableList;
 
-import java.util.Objects;
+import static org.apache.tajo.common.TajoDataTypes.Type.ARRAY;
 
-public class Array extends Type {
-  private final Type elementType;
+public class Array extends TypeParamterizedType {
 
   public Array(Type elementType) {
-    this.elementType = elementType;
+    super(ARRAY, ImmutableList.of(elementType));
   }
 
   public Type elementType() {
-    return this.elementType;
-  }
-
-  @Override
-  public boolean hasParam() {
-    return true;
-  }
-
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.ARRAY;
-  }
-
-  @Override
-  public String toString() {
-    return "array<" + elementType + ">";
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(baseType(), elementType);
-  }
-
-  @Override
-  public boolean equals(Object object) {
-    if (object instanceof Array) {
-      return elementType.equals(((Array)object).elementType);
-    }
-
-    return false;
+    return this.params.get(0);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Blob.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Blob.java b/tajo-common/src/main/java/org/apache/tajo/type/Blob.java
index 104dd12..360ea13 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Blob.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Blob.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.BLOB;
 
 public class Blob extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.BLOB;
+  public Blob() {
+    super(BLOB);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Bool.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Bool.java b/tajo-common/src/main/java/org/apache/tajo/type/Bool.java
index ddd7300..923e8e9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Bool.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Bool.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.BOOLEAN;
 
 public class Bool extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.BOOLEAN;
+  public Bool() {
+    super(BOOLEAN);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Char.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Char.java b/tajo-common/src/main/java/org/apache/tajo/type/Char.java
index 348e041..586775e 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Char.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Char.java
@@ -18,47 +18,16 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import com.google.common.collect.ImmutableList;
 
-import java.util.Objects;
-
-public class Char extends Type {
-  private final int length;
+import static org.apache.tajo.common.TajoDataTypes.Type.CHAR;
 
+public class Char extends ValueParamterizedType {
   public Char(int length) {
-    this.length = length;
+    super(CHAR, ImmutableList.of(length));
   }
 
   public int length() {
-    return length;
-  }
-
-  @Override
-  public boolean hasParam() {
-    return true;
-  }
-
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.CHAR;
-  }
-
-  @Override
-  public String toString() {
-    return "char(" + length + ")";
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(baseType(), length);
-  }
-
-  @Override
-  public boolean equals(Object object) {
-    if (object instanceof Char) {
-      return length == ((Char) object).length;
-    }
-
-    return false;
+    return params.get(0);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Date.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Date.java b/tajo-common/src/main/java/org/apache/tajo/type/Date.java
index 0febc1a..a68ab97 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Date.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Date.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.DATE;
 
 public class Date extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.DATE;
+  public Date() {
+    super(DATE);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Float4.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Float4.java b/tajo-common/src/main/java/org/apache/tajo/type/Float4.java
index 47ccd6b..b8c8218 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Float4.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Float4.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.FLOAT4;
 
 public class Float4 extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.FLOAT4;
+  public Float4() {
+    super(FLOAT4);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Float8.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Float8.java b/tajo-common/src/main/java/org/apache/tajo/type/Float8.java
index 6d2702c..f36fac8 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Float8.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Float8.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.FLOAT8;
 
 public class Float8 extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.FLOAT8;
+  public Float8() {
+    super(FLOAT8);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Inet4.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Inet4.java b/tajo-common/src/main/java/org/apache/tajo/type/Inet4.java
index 05dc0cf..541841e 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Inet4.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Inet4.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.INET4;
 
 public class Inet4 extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.INET4;
+  public Inet4() {
+    super(INET4);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Int1.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Int1.java b/tajo-common/src/main/java/org/apache/tajo/type/Int1.java
index 9288328..d86d36e 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Int1.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Int1.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.INT1;
 
 public class Int1 extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.INT1;
+  public Int1() {
+    super(INT1);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Int2.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Int2.java b/tajo-common/src/main/java/org/apache/tajo/type/Int2.java
index 44db4cb..e7c2215 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Int2.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Int2.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.INT2;
 
 public class Int2 extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.INT2;
+  public Int2() {
+    super(INT2);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Int4.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Int4.java b/tajo-common/src/main/java/org/apache/tajo/type/Int4.java
index 0837e2c..e8db784 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Int4.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Int4.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.INT4;
 
 public class Int4 extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.INT4;
+  public Int4() {
+    super(INT4);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Int8.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Int8.java b/tajo-common/src/main/java/org/apache/tajo/type/Int8.java
index 3d4049c..999e9ac 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Int8.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Int8.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.INT8;
 
 public class Int8 extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.INT8;
+  public Int8() {
+    super(INT8);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Interval.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Interval.java b/tajo-common/src/main/java/org/apache/tajo/type/Interval.java
index 6c99526..d6dc17e 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Interval.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Interval.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.INTERVAL;
 
 public class Interval extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.INTERVAL;
+  public Interval() {
+    super(INTERVAL);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Map.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Map.java b/tajo-common/src/main/java/org/apache/tajo/type/Map.java
index afc1cc3..786d6af 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Map.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Map.java
@@ -18,49 +18,20 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import com.google.common.collect.ImmutableList;
 
-import java.util.Objects;
-
-public class Map extends Type {
-  private final Type keyType;
-  private final Type valueType;
+import static org.apache.tajo.common.TajoDataTypes.Type.MAP;
 
+public class Map extends TypeParamterizedType {
   public Map(Type keyType, Type valueType) {
-    this.keyType = keyType;
-    this.valueType = valueType;
+    super(MAP, ImmutableList.of(keyType, valueType));
   }
 
   public Type keyType() {
-    return this.keyType;
+    return this.params.get(0);
   }
 
   public Type valueType() {
-    return this.valueType;
-  }
-
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.MAP;
-  }
-
-  @Override
-  public String toString() {
-    return "map<" + keyType + "," + valueType + ">";
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(baseType(), keyType, valueType);
-  }
-
-  @Override
-  public boolean equals(Object object) {
-    if (object instanceof Map) {
-      Map other = (Map) object;
-      return keyType.equals(other.keyType) && valueType.equals(other.valueType);
-    }
-
-    return false;
+    return this.params.get(1);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Null.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Null.java b/tajo-common/src/main/java/org/apache/tajo/type/Null.java
index fe823e1..74cf052 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Null.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Null.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.NULL_TYPE;
 
 public class Null extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.NULL_TYPE;
+  public Null() {
+    super(NULL_TYPE);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Numeric.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Numeric.java b/tajo-common/src/main/java/org/apache/tajo/type/Numeric.java
index 75ef392..c21325d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Numeric.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Numeric.java
@@ -18,49 +18,21 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import com.google.common.collect.ImmutableList;
 
-import java.util.Objects;
+import static org.apache.tajo.common.TajoDataTypes.Type.NUMERIC;
 
-public class Numeric extends Type {
-  private final int precision;
-  private final int scale;
+public class Numeric extends ValueParamterizedType {
 
   public Numeric(int precision, int scale) {
-    this.precision = precision;
-    this.scale = scale;
+    super(NUMERIC, ImmutableList.of(precision, scale));
   }
 
   public int precision() {
-    return this.precision;
+    return this.params.get(0);
   }
 
   public int scale() {
-    return this.scale;
-  }
-
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.NUMERIC;
-  }
-
-  @Override
-  public String toString() {
-    return "numeric(" + precision + "," + scale + ")";
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(baseType(), precision, scale);
-  }
-
-  @Override
-  public boolean equals(Object object) {
-    if (object instanceof Numeric) {
-      Numeric other = (Numeric) object;
-      return precision == other.precision && scale == other.scale;
-    }
-
-    return false;
+    return this.params.get(1);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Protobuf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Protobuf.java b/tajo-common/src/main/java/org/apache/tajo/type/Protobuf.java
index 59d3c1a..e286724 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Protobuf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Protobuf.java
@@ -18,25 +18,17 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.PROTOBUF;
 
 public class Protobuf extends Type {
   private String msgName;
 
   public Protobuf(String msgName) {
+    super(PROTOBUF);
     this.msgName = msgName;
   }
 
   public String getMessageName() {
     return this.msgName;
   }
-
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.PROTOBUF;
-  }
-
-  public boolean hasParam() {
-    return true;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Record.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Record.java b/tajo-common/src/main/java/org/apache/tajo/type/Record.java
new file mode 100644
index 0000000..46d7e19
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Record.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.type;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tajo.schema.Field;
+import org.apache.tajo.util.StringUtils;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.tajo.common.TajoDataTypes.Type.RECORD;
+
+/**
+ * Represents Record type
+ */
+public class Record extends Type implements Iterable<Field> {
+  private final ImmutableList<Field> fields;
+
+  public Record(Collection<Field> fields) {
+    super(RECORD);
+    this.fields = ImmutableList.copyOf(fields);
+  }
+
+  public int size() {
+    return fields.size();
+  }
+
+  public Field field(int idx) {
+    return fields.get(idx);
+  }
+
+  public List<Field> fields() {
+    return this.fields;
+  }
+
+  @Override
+  public String toString() {
+    return "RECORD(" + StringUtils.join(fields, ", ") + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(kind(), Objects.hash(fields));
+  }
+
+  @Override
+  public boolean equals(Object object) {
+    if (object instanceof Record) {
+      Record other = (Record) object;
+      return fields.equals(other.fields);
+    }
+
+    return false;
+  }
+
+  @Override
+  public Iterator<Field> iterator() {
+    return fields.iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Struct.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Struct.java b/tajo-common/src/main/java/org/apache/tajo/type/Struct.java
deleted file mode 100644
index 551eb81..0000000
--- a/tajo-common/src/main/java/org/apache/tajo/type/Struct.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.type;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.util.StringUtils;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-public class Struct extends Type {
-  private final ImmutableList<Type> memberTypes;
-
-  public Struct(Collection<Type> memberTypes) {
-    this.memberTypes = ImmutableList.copyOf(memberTypes);
-  }
-
-  public int size() {
-    return memberTypes.size();
-  }
-
-  public Type memberType(int idx) {
-    return memberTypes.get(idx);
-  }
-
-  public List<Type> memberTypes() {
-    return this.memberTypes;
-  }
-
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.RECORD;
-  }
-
-  @Override
-  public String toString() {
-    return "struct(" + StringUtils.join(memberTypes, ",") + ")";
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(baseType(), Objects.hash(memberTypes));
-  }
-
-  @Override
-  public boolean equals(Object object) {
-    if (object instanceof Struct) {
-      Struct other = (Struct) object;
-      return memberTypes.equals(other.memberTypes);
-    }
-
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Text.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Text.java b/tajo-common/src/main/java/org/apache/tajo/type/Text.java
index e523afe..aab49a4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Text.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Text.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.TEXT;
 
 public class Text extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.TEXT;
+  public Text() {
+    super(TEXT);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Time.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Time.java b/tajo-common/src/main/java/org/apache/tajo/type/Time.java
index 3b1414c..12fe5f8 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Time.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Time.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.TIME;
 
 public class Time extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.TIME;
+  public Time() {
+    super(TIME);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Timestamp.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Timestamp.java b/tajo-common/src/main/java/org/apache/tajo/type/Timestamp.java
index 5b02314..577f877 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Timestamp.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Timestamp.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.type;
 
-import org.apache.tajo.common.TajoDataTypes;
+import static org.apache.tajo.common.TajoDataTypes.Type.TIMESTAMP;
 
 public class Timestamp extends Type {
-  @Override
-  public TajoDataTypes.Type baseType() {
-    return TajoDataTypes.Type.TIMESTAMP;
+  public Timestamp() {
+    super(TIMESTAMP);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/Type.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/Type.java b/tajo-common/src/main/java/org/apache/tajo/type/Type.java
index fdea07e..35d666c 100644
--- a/tajo-common/src/main/java/org/apache/tajo/type/Type.java
+++ b/tajo-common/src/main/java/org/apache/tajo/type/Type.java
@@ -18,12 +18,21 @@
 
 package org.apache.tajo.type;
 
+import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.TypeProto;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.schema.Field;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
-public abstract class Type {
+/**
+ * Represents Type
+ */
+public abstract class Type implements Cloneable, ProtoObject<TypeProto> {
 
   // No paramter types
   public static final Any Any = new Any();
@@ -43,39 +52,64 @@ public abstract class Type {
   public static final Blob Blob = new Blob();
   public static final Inet4 Inet4 = new Inet4();
 
-  public abstract TajoDataTypes.Type baseType();
+  protected TajoDataTypes.Type kind;
+
+  public Type(TajoDataTypes.Type kind) {
+    this.kind = kind;
+  }
+
+  public TajoDataTypes.Type kind() {
+    return kind;
+  }
+
+  public boolean isTypeParameterized() {
+    return false;
+  }
 
-  public boolean hasParam() {
+  public boolean isValueParameterized() {
     return false;
   }
 
+  public List<Type> getTypeParameters() {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  public List<Integer> getValueParameters() {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
   protected static String typeName(TajoDataTypes.Type type) {
-    return type.name().toLowerCase();
+    return type.name().toUpperCase();
   }
 
   @Override
   public int hashCode() {
-    return baseType().hashCode();
+    return kind().hashCode();
   }
 
   @Override
   public boolean equals(Object t) {
-    return t instanceof Type && ((Type)t).baseType() == baseType();
+    return t instanceof Type && ((Type)t).kind() == kind();
   }
 
   @Override
   public String toString() {
-    return typeName(baseType());
+    return typeName(kind());
   }
 
   public boolean isStruct() {
-    return this.baseType() == TajoDataTypes.Type.RECORD;
+    return this.kind() == TajoDataTypes.Type.RECORD;
   }
 
-  public boolean isNull() { return this.baseType() == TajoDataTypes.Type.NULL_TYPE; }
+  public boolean isNull() { return this.kind() == TajoDataTypes.Type.NULL_TYPE; }
 
+  public static int DEFAULT_PRECISION = 0;
   public static int DEFAULT_SCALE = 0;
 
+  public static Numeric Numeric() {
+    return new Numeric(DEFAULT_PRECISION, DEFAULT_SCALE);
+  }
+
   public static Numeric Numeric(int precision) {
     return new Numeric(precision, DEFAULT_SCALE);
   }
@@ -84,22 +118,6 @@ public abstract class Type {
     return new Numeric(precision, scale);
   }
 
-  public static Date Date() {
-    return new Date();
-  }
-
-  public static Time Time() {
-    return new Time();
-  }
-
-  public static Timestamp Timestamp() {
-    return new Timestamp();
-  }
-
-  public static Interval Interval() {
-    return new Interval();
-  }
-
   public static Char Char(int len) {
     return new Char(len);
   }
@@ -108,30 +126,18 @@ public abstract class Type {
     return new Varchar(len);
   }
 
-  public static Text Text() {
-    return new Text();
-  }
-
-  public static Blob Blob() {
-    return new Blob();
-  }
-
-  public static Inet4 Inet4() {
-    return new Inet4();
-  }
-
-  public static Struct Struct(Collection<Type> types) {
-    return new Struct(types);
-  }
-
-  public static Struct Struct(Type ... types) {
-    return new Struct(Arrays.asList(types));
+  public static Record Record(Collection<Field> types) {
+    return new Record(types);
   }
 
   public static Array Array(Type type) {
     return new Array(type);
   }
 
+  public static Record Record(Field... types) {
+    return new Record(Arrays.asList(types));
+  }
+
   public static Map Map(Type keyType, Type valueType) {
     return new Map(keyType, valueType);
   }
@@ -139,4 +145,9 @@ public abstract class Type {
   public static Null Null() {
     return new Null();
   }
+
+  @Override
+  public TypeProto getProto() {
+    return TypeProtobufEncoder.encode(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/TypeFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/TypeFactory.java b/tajo-common/src/main/java/org/apache/tajo/type/TypeFactory.java
new file mode 100644
index 0000000..e2cf45b
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/type/TypeFactory.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.type;
+
+import org.apache.tajo.Assert;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.exception.NotImplementedException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.schema.Field;
+import org.apache.tajo.util.StringUtils;
+
+import java.util.List;
+
+import static org.apache.tajo.type.Type.Char;
+import static org.apache.tajo.type.Type.Null;
+import static org.apache.tajo.type.Type.Varchar;
+
+public class TypeFactory {
+  /**
+   * This is for base types.
+   *
+   * @param baseType legacy base type
+   * @return Type
+   */
+  public static Type create(TajoDataTypes.Type baseType) {
+    switch (baseType) {
+    case BOOLEAN:
+      return Type.Bool;
+    case INT1:
+      return Type.Int1;
+    case INT2:
+      return Type.Int2;
+    case INT4:
+      return Type.Int4;
+    case INT8:
+      return Type.Int8;
+    case FLOAT4:
+      return Type.Float4;
+    case FLOAT8:
+      return Type.Float8;
+    case DATE:
+      return Type.Date;
+    case TIME:
+      return Type.Time;
+    case TIMESTAMP:
+      return Type.Timestamp;
+    case INTERVAL:
+      return Type.Interval;
+    case CHAR:
+      return Type.Char(1); // default len = 1
+    case TEXT:
+      return Type.Text;
+    case BLOB:
+      return Type.Blob;
+    case INET4:
+      return Type.Inet4;
+    case RECORD:
+      // for better exception
+      throw new TajoRuntimeException(new NotImplementedException("record projection"));
+    case NULL_TYPE:
+      return Type.Null;
+    case ANY:
+      return Type.Any;
+
+    case BOOLEAN_ARRAY:
+      return Type.Array(Type.Bool);
+    case INT1_ARRAY:
+      return Type.Array(Type.Int1);
+    case INT2_ARRAY:
+      return Type.Array(Type.Int2);
+    case INT4_ARRAY:
+      return Type.Array(Type.Int4);
+    case INT8_ARRAY:
+      return Type.Array(Type.Int8);
+    case FLOAT4_ARRAY:
+      return Type.Array(Type.Float4);
+    case FLOAT8_ARRAY:
+      return Type.Array(Type.Float8);
+    case TIMESTAMP_ARRAY:
+      return Type.Array(Type.Timestamp);
+    case DATE_ARRAY:
+      return Type.Array(Type.Date);
+    case TIME_ARRAY:
+      return Type.Array(Type.Time);
+    case TEXT_ARRAY:
+      return Type.Array(Type.Text);
+
+    default:
+      throw new TajoRuntimeException(new UnsupportedException(baseType.name()));
+    }
+  }
+
+  public static Type create(TajoDataTypes.Type baseType,
+                            List<Type> typeParams,
+                            List<Integer> valueParams,
+                            List<Field> fieldParams) {
+    switch (baseType) {
+    case CHAR: {
+      Assert.assertCondition(valueParams.size() == 1,
+          "Char type requires 1 integer parameters, but it takes (%s).", StringUtils.join(typeParams));
+      return Char(valueParams.get(0));
+    }
+    case VARCHAR: {
+      Assert.assertCondition(valueParams.size() == 1,
+          "Varchar type requires 1 integer parameters, but it takes (%s).", StringUtils.join(typeParams));
+      return Varchar(valueParams.get(0));
+    }
+    case TEXT: return Type.Text;
+
+    case BOOLEAN: return Type.Bool;
+    case INT1: return Type.Int1;
+    case INT2: return Type.Int2;
+    case INT4: return Type.Int4;
+    case INT8: return Type.Int8;
+    case FLOAT4: return Type.Float4;
+    case FLOAT8: return Type.Float8;
+    case NUMERIC: {
+      if (valueParams.size() == 0) {
+        return Numeric.Numeric();
+
+      } else {
+        for (Object p : valueParams) {
+          Assert.assertCondition(p instanceof Integer, "Numeric type requires integer parameters");
+        }
+        if (valueParams.size() == 1) {
+          return Numeric.Numeric(valueParams.get(0));
+        } else if (valueParams.size() == 2) {
+          return Numeric.Numeric(valueParams.get(0), valueParams.get(1));
+        } else {
+          Assert.assertCondition(false,
+              "Numeric type can take 2 or less integer parameters, but it takes (%s).", StringUtils.join(valueParams));
+        }
+      }
+    }
+
+    case DATE: return Type.Date;
+    case TIME: return Type.Time;
+    case TIMESTAMP: return Type.Timestamp;
+    case INTERVAL: return Type.Interval;
+    case BLOB: return Type.Blob;
+    case INET4: return Type.Inet4;
+
+    case ARRAY: {
+      Assert.assertCondition(typeParams.size() == 1,
+          "Array Type requires 1 type parameters, but it takes (%s).", StringUtils.join(typeParams));
+      return Type.Array(typeParams.get(0));
+    }
+    case RECORD: {
+      Assert.assertCondition(fieldParams.size() >= 1,
+          "Record Type requires at least 1 field parameters, but it takes (%s).", StringUtils.join(fieldParams));
+      return Type.Record(fieldParams);
+    }
+    case MAP: {
+      Assert.assertCondition(typeParams.size() == 2,
+          "Map Type requires 2 type parameters, but it takes (%s).", StringUtils.join(typeParams));
+
+      return Type.Map(typeParams.get(0), typeParams.get(1));
+    }
+    case NULL_TYPE:
+      return Null();
+
+    default:
+      throw new TajoInternalError(new UnsupportedException(baseType.name()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/TypeParamterizedType.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/TypeParamterizedType.java b/tajo-common/src/main/java/org/apache/tajo/type/TypeParamterizedType.java
new file mode 100644
index 0000000..bacf6c7
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/type/TypeParamterizedType.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.type;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.util.StringUtils;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents a type-parameterized Type (e.g., Map&lt;Int, String&gt; and Array&lt;Double&gt;)
+ */
+public abstract class TypeParamterizedType extends Type {
+  protected ImmutableList<Type> params;
+
+  public TypeParamterizedType(TajoDataTypes.Type type, ImmutableList<Type> params) {
+    super(type);
+    this.params = params;
+  }
+
+  @Override
+  public boolean isTypeParameterized() {
+    return true;
+  }
+
+  @Override
+  public List<Type> getTypeParameters() {
+    return params;
+  }
+
+  @Override
+  public boolean equals(Object object) {
+    if (object == this) {
+      return true;
+    }
+
+    if (object instanceof TypeParamterizedType) {
+      TypeParamterizedType other = (TypeParamterizedType) object;
+      return this.kind.equals(other.kind) && params.equals(other.params);
+    }
+
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(kind(), params);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(typeName(this.kind));
+    sb.append("<");
+    sb.append(StringUtils.join(params, ","));
+    sb.append(">");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/TypeProtobufEncoder.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/TypeProtobufEncoder.java b/tajo-common/src/main/java/org/apache/tajo/type/TypeProtobufEncoder.java
new file mode 100644
index 0000000..4fd2897
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/type/TypeProtobufEncoder.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.type;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.QualifiedIdentifierProto;
+import org.apache.tajo.common.TajoDataTypes.TypeElement;
+import org.apache.tajo.common.TajoDataTypes.TypeProto;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.schema.Field;
+import org.apache.tajo.schema.QualifiedIdentifier;
+
+import javax.annotation.Nullable;
+import java.util.*;
+
+import static java.util.Collections.EMPTY_LIST;
+import static org.apache.tajo.Assert.assertCondition;
+import static org.apache.tajo.common.TajoDataTypes.Type.*;
+import static org.apache.tajo.type.Type.Array;
+import static org.apache.tajo.type.Type.Map;
+import static org.apache.tajo.type.Type.Record;
+
+public class TypeProtobufEncoder {
+  public static TypeProto encode(Type type) {
+
+    final TypeProto.Builder builder = TypeProto.newBuilder();
+    new Visitor(builder).visit(type);
+    return builder.build();
+  }
+
+  public static Type decode(TypeProto proto) {
+    Stack<Type> stack = new Stack<>();
+
+    for (int curIdx = 0; curIdx < proto.getElementsCount(); curIdx++) {
+      TypeElement e = proto.getElements(curIdx);
+
+      if (e.hasChildNum()) { // if it is a type-parameterized, that is
+        List<Type> childTypes = popMultiItems(stack, e.getChildNum());
+
+        if (e.getKind() == ARRAY || e.getKind() == MAP) {
+          stack.push(createTypeParameterizedType(e, childTypes));
+
+        } else { // record
+          assertCondition(e.getKind() == RECORD,
+              "This type must be RECORD type.");
+          assertCondition(childTypes.size() == e.getFieldNamesCount(),
+              "The number of Field types and names must be equal.");
+
+          ImmutableList.Builder<Field> fields = ImmutableList.builder();
+          for (int i = 0; i < childTypes.size(); i++) {
+            fields.add(new Field(QualifiedIdentifier.fromProto(e.getFieldNames(i)), childTypes.get(i)));
+          }
+          stack.push(Record(fields.build()));
+        }
+
+      } else {
+        stack.push(createPrimitiveType(e));
+      }
+    }
+
+    assertCondition(stack.size() == 1, "Stack size has two or more items.");
+    return stack.pop();
+  }
+
+  static List<Type> popMultiItems(Stack<Type> stack, int num) {
+    List<Type> typeParams = new ArrayList<>();
+    for (int i = 0; i < num; i++) {
+      typeParams.add(0, stack.pop());
+    }
+    return typeParams;
+  }
+
+  static boolean isValueParameterized(TajoDataTypes.Type baseType) {
+    return baseType == CHAR || baseType == VARCHAR || baseType == NUMERIC;
+  }
+
+  static Type createPrimitiveType(TypeElement element) {
+    assertPrimitiveType(element);
+
+    if (isValueParameterized(element.getKind())) {
+      return TypeFactory.create(element.getKind(), EMPTY_LIST, element.getValueParamsList(), EMPTY_LIST);
+    } else if (element.getKind() == PROTOBUF) { // TODO - PROTOBUF type should be removed later
+      return new Protobuf(element.getStringParams(0));
+    } else {
+      return TypeFactory.create(element.getKind());
+    }
+  }
+
+  static Type createTypeParameterizedType(TypeElement element, List<Type> childTypes) {
+    switch (element.getKind()) {
+    case ARRAY:
+      return Array(childTypes.get(0));
+    case MAP:
+      return Map(childTypes.get(0), childTypes.get(1));
+    default:
+      throw new TajoInternalError(element.getKind().name() + " is not a type-parameterized type.");
+    }
+  }
+
+  static void assertPrimitiveType(TypeElement element) {
+    TajoDataTypes.Type baseType = element.getKind();
+    if (baseType == MAP || baseType == RECORD || baseType == ARRAY) {
+      throw new TajoInternalError(baseType.name() + " is not a primitive type.");
+    }
+  }
+
+  static class Visitor extends TypeVisitor {
+    final TypeProto.Builder builder;
+
+    Visitor(TypeProto.Builder builder) {
+      this.builder = builder;
+    }
+
+    @Override
+    public void visitPrimitive(Type type) {
+      TypeElement.Builder typeElemBuilder = TypeElement.newBuilder()
+          .setKind(type.kind);
+
+      if (type.isValueParameterized()) {
+        typeElemBuilder.addAllValueParams(type.getValueParameters());
+      } else if (type.kind == PROTOBUF) {
+        typeElemBuilder.addStringParams(((Protobuf)type).getMessageName());
+      }
+
+      builder.addElements(typeElemBuilder);
+    }
+
+    @Override
+    public void visitMap(Map map) {
+      super.visitMap(map);
+      builder.addElements(TypeElement.newBuilder()
+             .setKind(map.kind)
+             .setChildNum(2)
+          );
+    }
+
+    @Override
+    public void visitArray(Array array) {
+      super.visitArray(array);
+      builder
+          .addElements(TypeElement.newBuilder()
+              .setKind(array.kind)
+              .setChildNum(1)
+          );
+    }
+
+    @Override
+    public void visitRecord(Record record) {
+      super.visitRecord(record);
+
+      Collection<QualifiedIdentifierProto> field_names =
+          Collections2.transform(record.fields(), new Function<Field, QualifiedIdentifierProto>() {
+        @Override
+        public QualifiedIdentifierProto apply(@Nullable Field field) {
+          return field.name().getProto();
+        }
+      });
+
+      builder
+          .addElements(TypeElement.newBuilder()
+              .setChildNum(record.size())
+              .addAllFieldNames(field_names)
+              .setKind(RECORD));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d2bf407/tajo-common/src/main/java/org/apache/tajo/type/TypeStringEncoder.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/type/TypeStringEncoder.java b/tajo-common/src/main/java/org/apache/tajo/type/TypeStringEncoder.java
new file mode 100644
index 0000000..b6e1b18
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/type/TypeStringEncoder.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.type;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import org.apache.tajo.Assert;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.schema.Field;
+import org.apache.tajo.schema.IdentifierUtil;
+import org.apache.tajo.schema.QualifiedIdentifier;
+import org.apache.tajo.util.StringUtils;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Stack;
+
+import static org.apache.tajo.schema.IdentifierPolicy.DefaultPolicy;
+
+/**
+ * This class enables to serialize a type into a string representation and vice versa.
+ */
+public class TypeStringEncoder {
+
+  /**
+   * Encode a type into a string representation
+   * @param type A type
+   * @return A type string representation
+   */
+  public static String encode(Type type) {
+    StringBuilder sb = new StringBuilder(type.kind().name());
+
+    if (type.isTypeParameterized()) {
+      sb.append("<");
+      sb.append(StringUtils.join(type.getTypeParameters(), ",", new Function<Type, String>() {
+        @Override
+        public String apply(@Nullable Type type) {
+          return TypeStringEncoder.encode(type);
+        }
+      }));
+      sb.append(">");
+    }
+
+    // Assume all parameter values are integers.
+    if (type.isValueParameterized()) {
+      sb.append("(");
+      sb.append(StringUtils.join(type.getValueParameters(), ","));
+      sb.append(")");
+    }
+
+    if (type.isStruct()) {
+      Record record = (Record) type;
+      sb.append("[");
+      sb.append(StringUtils.join(record.fields(), ",", new Function<Field, String>() {
+        @Override
+        public String apply(@Nullable Field field) {
+          return serializeField(field);
+        }
+      }));
+      sb.append("]");
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * Make a string from a field
+   * @param field A field
+   * @return String representation for a field
+   */
+  static String serializeField(Field field) {
+    return field.name().raw(DefaultPolicy()) + " " + encode(field.type());
+  }
+
+  /**
+   * Decode a string representation to a Type.
+   * @param signature Type string representation
+   * @return Type
+   */
+  public static Type decode(String signature) {
+
+    // termination condition in this recursion
+    if (!(signature.contains("<") || signature.contains("(") || signature.contains("["))) {
+      return createType(signature,
+          ImmutableList.<Type>of(),
+          ImmutableList.<Integer>of(),
+          ImmutableList.<Field>of());
+    }
+
+    final Stack<Character> stack = new Stack<>();
+    final Stack<Integer> spanStack = new Stack<>();
+    String baseType = null;
+    for (int i = 0; i < signature.length(); i++) {
+      char c = signature.charAt(i);
+
+      if (c == '<') {
+        if (stack.isEmpty()) {
+          Assert.assertCondition(baseType == null, "Expected baseName to be null");
+          baseType = signature.substring(0, i);
+        }
+        stack.push('<');
+        spanStack.push(i + 1);
+
+      } else if (c == '>') {
+        Assert.assertCondition(stack.pop() == '<', "Bad signature: '%s'", signature);
+        int paramStartIdx = spanStack.pop();
+
+        if (stack.isEmpty()) { // ensure outermost parameters
+          return createType(baseType,
+              parseList(signature.substring(paramStartIdx, i), new Function<String, Type>() {
+                @Override
+                public Type apply(@Nullable String s) {
+                  return decode(s);
+                }
+              }),
+              ImmutableList.<Integer>of(),
+              ImmutableList.<Field>of());
+        }
+
+      } else if (c == '[') {
+        if (stack.isEmpty()) {
+          Assert.assertCondition(baseType == null, "Expected baseName to be null");
+          baseType = signature.substring(0, i);
+        }
+
+        stack.push('[');
+        spanStack.push(i + 1);
+
+      } else if (c == ']') {
+        Assert.assertCondition(stack.pop() == '[', "Bad signature: '%s'", signature);
+
+        int paramStartIdx = spanStack.pop();
+        if (stack.isEmpty()) { // ensure outermost parameters
+          return createType(baseType,
+              ImmutableList.<Type>of(),
+              ImmutableList.<Integer>of(),
+              parseList(signature.substring(paramStartIdx, i), new Function<String, Field>() {
+                @Override
+                public Field apply(@Nullable String s) {
+                  return parseField(s);
+                }
+              }));
+        }
+
+      } else if (c == '(') {
+        if (stack.isEmpty()) {
+          Assert.assertCondition(baseType == null, "Expected baseName to be null");
+          baseType = signature.substring(0, i);
+        }
+        stack.push('(');
+        spanStack.push(i + 1);
+
+      } else if (c == ')') {
+        Assert.assertCondition(stack.pop() == '(', "Bad signature: '%s'", signature);
+        int paramStartIdx = spanStack.pop();
+
+        if (stack.isEmpty()) { // ensure outermost parameters
+          return createType(baseType,
+              ImmutableList.<Type>of(),
+              parseList(signature.substring(paramStartIdx, i), new Function<String, Integer>() {
+                @Override
+                public Integer apply(@Nullable String s) {
+                  return parseValue(s);
+                }
+              }),
+              ImmutableList.<Field>of());
+        }
+      }
+    }
+
+    return null;
+  }
+
+  public static int parseValue(String literal) {
+    try {
+      return Integer.parseInt(literal);
+    } catch (NumberFormatException e) {
+      throw new TajoInternalError(e);
+    }
+  }
+
+  /**
+   * Parse a string delimited by comma into a list of object instances depending on <pre>itemParser</pre>.
+   * @param str String delimited by comma
+   * @param itemParser A function to transform a string to an object.
+   * @param <T> Type to be transformed from a string
+   * @return List of object instances
+   */
+  static <T> List<T> parseList(String str, Function<String, T> itemParser) {
+    if (!str.contains(",")) { // if just one item
+      return ImmutableList.of(itemParser.apply(str));
+    }
+
+    final ImmutableList.Builder<T> fields = ImmutableList.builder();
+    final Stack<Character> stack = new Stack<>();
+    int paramStartIdx = 0;
+    for (int i = 0; i < str.length(); i++) {
+      char c = str.charAt(i);
+
+      if (c == '<' || c == '[' || c == '(') {
+        stack.push(c);
+      } else if (c == '>') {
+        Assert.assertCondition(stack.pop() == '<', "Bad signature: '%s'", str);
+      } else if (c == ']') {
+        Assert.assertCondition(stack.pop() == '[', "Bad signature: '%s'", str);
+      } else if (c == ')') {
+        Assert.assertCondition(stack.pop() == '(', "Bad signature: '%s'", str);
+      } else if (c == ',') {
+        if (stack.isEmpty()) { // ensure outermost type parameters
+          fields.add(itemParser.apply(str.substring(paramStartIdx, i)));
+          paramStartIdx = i + 1;
+        }
+      }
+    }
+
+    Assert.assertCondition(stack.empty(), "Bad signature: '%s'", str);
+    if (paramStartIdx < str.length()) {
+      fields.add(itemParser.apply(str.substring(paramStartIdx, str.length())));
+    }
+
+    return fields.build();
+  }
+
+  /**
+   * Make a field from a string representation
+   * @param str String
+   * @return Field
+   */
+  static Field parseField(String str) {
+    // A field consists of an identifier and a type, and they are delimited by space.
+    if (!str.contains(" ")) {
+      Assert.assertCondition(false, "Bad field signature: '%s'", str);
+    }
+
+    // Stack to track the nested bracket depth
+    Stack<Character> stack = new Stack<>();
+    int paramStartIdx = 0;
+    for (int i = 0; i < str.length(); i++) {
+      char c = str.charAt(i);
+
+      if (c == '<' || c == '[' || c == '(') {
+        stack.push(c);
+      } else if (c == '>') { // for validation
+        Assert.assertCondition(stack.pop() == '<', "Bad field signature: '%s'", str);
+      } else if (c == ']') { // for validation
+        Assert.assertCondition(stack.pop() == '[', "Bad field signature: '%s'", str);
+      } else if (c == ')') { // for validation
+        Assert.assertCondition(stack.pop() == '(', "Bad field signature: '%s'", str);
+
+      } else if (c == ' ') {
+        if (stack.isEmpty()) { // ensure outermost type parameters
+          QualifiedIdentifier identifier =
+              IdentifierUtil.makeIdentifier(str.substring(paramStartIdx, i), DefaultPolicy());
+          String typePart = str.substring(i + 1, str.length());
+          return new Field(identifier, decode(typePart));
+        }
+      }
+    }
+
+    return null;
+  }
+
+  public static Type createType(String baseTypeStr,
+                                List<Type> typeParams,
+                                List<Integer> valueParams,
+                                List<Field> fieldParams) {
+    final TajoDataTypes.Type baseType;
+
+    try {
+      baseType = TajoDataTypes.Type.valueOf(baseTypeStr);
+    } catch (Throwable t) {
+      throw new TajoInternalError(new UnsupportedException(baseTypeStr));
+    }
+
+    return TypeFactory.create(baseType, typeParams, valueParams, fieldParams);
+  }
+
+}


Mime
View raw message