tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [10/14] tajo git commit: TAJO-1359: Add nested field projector and language extension to project nested record. (hyunsik)
Date Tue, 12 May 2015 03:14:09 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
index 204f607..0d1c94a 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
 import net.minidev.json.JSONObject;
 import net.minidev.json.parser.JSONParser;
 import net.minidev.json.parser.ParseException;
+import org.apache.tajo.catalog.*;
 import org.apache.commons.net.util.Base64;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaUtil;
@@ -35,26 +36,212 @@ import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.text.TextLineDeserializer;
 import org.apache.tajo.storage.text.TextLineParsingError;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
 
 public class JsonLineDeserializer extends TextLineDeserializer {
   private JSONParser parser;
-  private Type[] types;
-  private String[] columnNames;
+  // Full Path -> Type
+  private Map<String, Type> types;
+  private String [] projectedPaths;
 
-  public JsonLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
-    super(schema, meta, targetColumnIndexes);
+  public JsonLineDeserializer(Schema schema, TableMeta meta, Column [] projected) {
+    super(schema, meta);
+
+    projectedPaths = new String[projected.length];
+    for (int i = 0; i < projected.length; i++) {
+      this.projectedPaths[i] = projected[i].getSimpleName();
+    }
   }
 
   @Override
   public void init() {
-    types = SchemaUtil.toTypes(schema);
-    columnNames = SchemaUtil.toSimpleNames(schema);
+    types = TUtil.newHashMap();
+    for (Column column : schema.getAllColumns()) {
 
+      // Keep types which only belong to projected paths
+      // For example, assume that a projected path is 'name/first_name', where name is RECORD
and first_name is TEXT.
+      // In this case, we should keep two types:
+      // * name - RECORD
+      // * name/first_name TEXT
+      for (String p :projectedPaths) {
+        if (p.startsWith(column.getSimpleName())) {
+          types.put(column.getSimpleName(), column.getDataType().getType());
+        }
+      }
+    }
     parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR);
   }
 
+  private static String makePath(String [] path, int depth) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i <= depth; i++) {
+      sb.append(path[i]);
+      if (i < depth) {
+        sb.append(NestedPathUtil.PATH_DELIMITER);
+      }
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   *
+   *
+   * @param object
+   * @param pathElements
+   * @param depth
+   * @param fieldIndex
+   * @param output
+   * @throws IOException
+   */
+  private void getValue(JSONObject object,
+                        String fullPath,
+                        String [] pathElements,
+                        int depth,
+                        int fieldIndex,
+                        Tuple output) throws IOException {
+    String fieldName = pathElements[depth];
+
+    if (!object.containsKey(fieldName)) {
+      output.put(fieldIndex, NullDatum.get());
+    }
+
+    switch (types.get(fullPath)) {
+    case BOOLEAN:
+      String boolStr = object.getAsString(fieldName);
+      if (boolStr != null) {
+        output.put(fieldIndex, DatumFactory.createBool(boolStr.equals("true")));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+    case CHAR:
+      String charStr = object.getAsString(fieldName);
+      if (charStr != null) {
+        output.put(fieldIndex, DatumFactory.createChar(charStr));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+    case INT1:
+    case INT2:
+      Number int2Num = object.getAsNumber(fieldName);
+      if (int2Num != null) {
+        output.put(fieldIndex, DatumFactory.createInt2(int2Num.shortValue()));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+    case INT4:
+      Number int4Num = object.getAsNumber(fieldName);
+      if (int4Num != null) {
+        output.put(fieldIndex, DatumFactory.createInt4(int4Num.intValue()));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+    case INT8:
+      Number int8Num = object.getAsNumber(fieldName);
+      if (int8Num != null) {
+        output.put(fieldIndex, DatumFactory.createInt8(int8Num.longValue()));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+    case FLOAT4:
+      Number float4Num = object.getAsNumber(fieldName);
+      if (float4Num != null) {
+        output.put(fieldIndex, DatumFactory.createFloat4(float4Num.floatValue()));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+    case FLOAT8:
+      Number float8Num = object.getAsNumber(fieldName);
+      if (float8Num != null) {
+        output.put(fieldIndex, DatumFactory.createFloat8(float8Num.doubleValue()));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+    case TEXT:
+      String textStr = object.getAsString(fieldName);
+      if (textStr != null) {
+        output.put(fieldIndex, DatumFactory.createText(textStr));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+    case TIMESTAMP:
+      String timestampStr = object.getAsString(fieldName);
+      if (timestampStr != null) {
+        output.put(fieldIndex, DatumFactory.createTimestamp(timestampStr));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+    case TIME:
+      String timeStr = object.getAsString(fieldName);
+      if (timeStr != null) {
+        output.put(fieldIndex, DatumFactory.createTime(timeStr));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+    case DATE:
+      String dateStr = object.getAsString(fieldName);
+      if (dateStr != null) {
+        output.put(fieldIndex, DatumFactory.createDate(dateStr));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+    case BIT:
+    case BINARY:
+    case VARBINARY:
+    case BLOB: {
+      Object jsonObject = object.getAsString(fieldName);
+
+      if (jsonObject == null) {
+        output.put(fieldIndex, NullDatum.get());
+        break;
+      }
+
+      output.put(fieldIndex, DatumFactory.createBlob(Base64.decodeBase64((String) jsonObject)));
+      break;    
+    }
+    case INET4:
+      String inetStr = object.getAsString(fieldName);
+      if (inetStr != null) {
+        output.put(fieldIndex, DatumFactory.createInet4(inetStr));
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+
+    case RECORD:
+      JSONObject nestedObject = (JSONObject) object.get(fieldName);
+      if (nestedObject != null) {
+        getValue(nestedObject, fullPath + "/" + pathElements[depth+1], pathElements, depth
+ 1, fieldIndex, output);
+      } else {
+        output.put(fieldIndex, NullDatum.get());
+      }
+      break;
+
+    case NULL_TYPE:
+      output.put(fieldIndex, NullDatum.get());
+      break;
+
+    default:
+      throw new NotImplementedException(types.get(fullPath).name() + " is not supported.");
+    }
+  }
+
   @Override
   public void deserialize(ByteBuf buf, Tuple output) throws IOException, TextLineParsingError
{
     byte[] line = new byte[buf.readableBytes()];
@@ -70,135 +257,9 @@ public class JsonLineDeserializer extends TextLineDeserializer {
       throw new TextLineParsingError(new String(line, TextDatum.DEFAULT_CHARSET), ae);
     }
 
-    for (int i = 0; i < targetColumnIndexes.length; i++) {
-      int actualIdx = targetColumnIndexes[i];
-      String fieldName = columnNames[actualIdx];
-
-      if (!object.containsKey(fieldName)) {
-        output.put(actualIdx, NullDatum.get());
-        continue;
-      }
-
-      switch (types[actualIdx]) {
-        case BOOLEAN:
-          String boolStr = object.getAsString(fieldName);
-          if (boolStr != null) {
-            output.put(actualIdx, DatumFactory.createBool(boolStr.equals("true")));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case CHAR:
-          String charStr = object.getAsString(fieldName);
-          if (charStr != null) {
-            output.put(actualIdx, DatumFactory.createChar(charStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case INT1:
-        case INT2:
-          Number int2Num = object.getAsNumber(fieldName);
-          if (int2Num != null) {
-            output.put(actualIdx, DatumFactory.createInt2(int2Num.shortValue()));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case INT4:
-          Number int4Num = object.getAsNumber(fieldName);
-          if (int4Num != null) {
-            output.put(actualIdx, DatumFactory.createInt4(int4Num.intValue()));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case INT8:
-          Number int8Num = object.getAsNumber(fieldName);
-          if (int8Num != null) {
-            output.put(actualIdx, DatumFactory.createInt8(int8Num.longValue()));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case FLOAT4:
-          Number float4Num = object.getAsNumber(fieldName);
-          if (float4Num != null) {
-            output.put(actualIdx, DatumFactory.createFloat4(float4Num.floatValue()));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case FLOAT8:
-          Number float8Num = object.getAsNumber(fieldName);
-          if (float8Num != null) {
-            output.put(actualIdx, DatumFactory.createFloat8(float8Num.doubleValue()));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case TEXT:
-          String textStr = object.getAsString(fieldName);
-          if (textStr != null) {
-            output.put(actualIdx, DatumFactory.createText(textStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case TIMESTAMP:
-          String timestampStr = object.getAsString(fieldName);
-          if (timestampStr != null) {
-            output.put(actualIdx, DatumFactory.createTimestamp(timestampStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case TIME:
-          String timeStr = object.getAsString(fieldName);
-          if (timeStr != null) {
-            output.put(actualIdx, DatumFactory.createTime(timeStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case DATE:
-          String dateStr = object.getAsString(fieldName);
-          if (dateStr != null) {
-            output.put(actualIdx, DatumFactory.createDate(dateStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-        case BIT:
-        case BINARY:
-        case VARBINARY:
-        case BLOB: {
-          Object jsonObject = object.getAsString(fieldName);
-
-          if (jsonObject == null) {
-            output.put(actualIdx, NullDatum.get());
-            break;
-          }
-
-          output.put(actualIdx, DatumFactory.createBlob(Base64.decodeBase64((String) jsonObject)));
-          break;
-        }
-        case INET4:
-          String inetStr = object.getAsString(fieldName);
-          if (inetStr != null) {
-            output.put(actualIdx, DatumFactory.createInet4(inetStr));
-          } else {
-            output.put(actualIdx, NullDatum.get());
-          }
-          break;
-
-        case NULL_TYPE:
-          output.put(actualIdx, NullDatum.get());
-          break;
-
-        default:
-          throw new NotImplementedException(types[actualIdx].name() + " is not supported.");
-      }
+    for (int i = 0; i < projectedPaths.length; i++) {
+      String [] paths = projectedPaths[i].split(NestedPathUtil.PATH_DELIMITER);
+      getValue(object, paths[0], paths, 0, i, output);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
index 6db2c29..5f12d76 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.storage.json;
 
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.storage.text.TextLineDeserializer;
@@ -26,8 +27,8 @@ import org.apache.tajo.storage.text.TextLineSerializer;
 
 public class JsonLineSerDe extends TextLineSerDe {
   @Override
-  public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes)
{
-    return new JsonLineDeserializer(schema, meta, targetColumnIndexes);
+  public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, Column []
projected) {
+    return new JsonLineDeserializer(schema, meta, projected);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
index d6faf2d..34e9661 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
@@ -35,8 +35,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 public class JsonLineSerializer extends TextLineSerializer {
-  private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
-
   private Type [] types;
   private String [] simpleNames;
   private int columnNum;

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index a091eac..4c675a4 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -77,10 +77,11 @@ public class TajoRecordConverter extends GroupConverter {
         continue;
       }
       Type type = parquetSchema.getType(index);
+      final int writeIndex = i;
       converters[index] = newConverter(column, type, new ParentValueContainer() {
         @Override
         void add(Object value) {
-          TajoRecordConverter.this.set(projectionIndex, value);
+          TajoRecordConverter.this.set(writeIndex, value);
         }
       });
       ++index;
@@ -145,7 +146,7 @@ public class TajoRecordConverter extends GroupConverter {
    */
   @Override
   public void start() {
-    currentTuple = new VTuple(tupleSize);
+    currentTuple = new VTuple(projectionMap.length);
   }
 
   /**
@@ -157,7 +158,7 @@ public class TajoRecordConverter extends GroupConverter {
       final int projectionIndex = projectionMap[i];
       Column column = tajoReadSchema.getColumn(projectionIndex);
       if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE
-          || currentTuple.get(projectionIndex) == null) {
+          || currentTuple.get(i) == null) {
         set(projectionIndex, NullDatum.get());
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 62e5ed9..af260b4 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -1635,7 +1635,7 @@ public class RCFile {
         return null;
       }
 
-      Tuple tuple = new VTuple(schema.size());
+      Tuple tuple = new VTuple(targets.length);
       getCurrentRow(tuple);
       return tuple;
     }
@@ -1705,16 +1705,16 @@ public class RCFile {
 
       for (int j = 0; j < selectedColumns.length; ++j) {
         SelectedColumn col = selectedColumns[j];
-        int i = col.colIndex;
+        int actualColumnIdx = col.colIndex;
 
         if (col.isNulled) {
-          tuple.put(i, NullDatum.get());
+          tuple.put(j, NullDatum.get());
         } else {
           colAdvanceRow(j, col);
 
-          Datum datum = serde.deserialize(schema.getColumn(i),
+          Datum datum = serde.deserialize(schema.getColumn(actualColumnIdx),
               currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength,
nullChars);
-          tuple.put(i, datum);
+          tuple.put(j, datum);
           col.rowReadIndex += col.prvLength;
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
index 92a041c..af0973e 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
@@ -108,9 +108,9 @@ public class SequenceFileScanner extends FileScanner {
     }
 
 
-    fieldIsNull = new boolean[schema.getColumns().size()];
-    fieldStart = new int[schema.getColumns().size()];
-    fieldLength = new int[schema.getColumns().size()];
+    fieldIsNull = new boolean[schema.getRootColumns().size()];
+    fieldStart = new int[schema.getRootColumns().size()];
+    fieldLength = new int[schema.getRootColumns().size()];
 
     prepareProjection(targets);
 
@@ -172,7 +172,7 @@ public class SequenceFileScanner extends FileScanner {
         Text text = new Text();
         reader.getCurrentValue(text);
         cells = BytesUtils.splitPreserveAllTokens(text.getBytes(), 
-            delimiter, projectionMap, schema.getColumns().size());
+            delimiter, projectionMap, schema.getRootColumns().size());
         totalBytes += (long)text.getBytes().length;
         tuple = new LazyTuple(schema, cells, 0, nullChars, serde);
       }
@@ -197,7 +197,7 @@ public class SequenceFileScanner extends FileScanner {
    * So, tajo must make a tuple after parsing hive style BinarySerDe.
    */
   private Tuple makeTuple(BytesWritable value) throws IOException{
-    Tuple tuple = new VTuple(schema.getColumns().size());
+    Tuple tuple = new VTuple(schema.getRootColumns().size());
 
     int start = 0;
     int length = value.getLength();
@@ -213,7 +213,7 @@ public class SequenceFileScanner extends FileScanner {
     int lastFieldByteEnd = start + 1;
 
     // Go through all bytes in the byte[]
-    for (int i = 0; i < schema.getColumns().size(); i++) {
+    for (int i = 0; i < schema.getRootColumns().size(); i++) {
       fieldIsNull[i] = true;
       if ((nullByte & (1 << (i % 8))) != 0) {
         fieldIsNull[i] = false;
@@ -322,12 +322,12 @@ public class SequenceFileScanner extends FileScanner {
 
   @Override
   public boolean isProjectable() {
-    return true;
+    return false;
   }
 
   @Override
   public boolean isSelectable() {
-    return true;
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index 03a0a26..0901c0b 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.storage.text;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.tajo.catalog.Column;
 import io.netty.buffer.ByteBufProcessor;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
@@ -28,6 +29,7 @@ import org.apache.tajo.storage.FieldSerializerDeserializer;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 public class CSVLineDeserializer extends TextLineDeserializer {
   private ByteBufProcessor processor;
@@ -35,8 +37,18 @@ public class CSVLineDeserializer extends TextLineDeserializer {
   private ByteBuf nullChars;
   private int delimiterCompensation;
 
-  public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
-    super(schema, meta, targetColumnIndexes);
+  private int [] targetColumnIndexes;
+  private Column [] projected;
+
+  public CSVLineDeserializer(Schema schema, TableMeta meta, Column [] projected) {
+    super(schema, meta);
+
+    this.projected = projected;
+    targetColumnIndexes = new int[projected.length];
+    for (int i = 0; i < projected.length; i++) {
+      targetColumnIndexes[i] = schema.getColumnId(projected[i].getQualifiedName());
+    }
+    Arrays.sort(targetColumnIndexes);
   }
 
   @Override
@@ -66,7 +78,7 @@ public class CSVLineDeserializer extends TextLineDeserializer {
     final int rowLength = lineBuf.readableBytes();
     int start = 0, fieldLength = 0, end = 0;
 
-    //Projection
+    // Projection
     int currentTarget = 0;
     int currentIndex = 0;
 
@@ -83,10 +95,10 @@ public class CSVLineDeserializer extends TextLineDeserializer {
         lineBuf.setIndex(start, start + fieldLength);
 
         try {
-          Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex),
currentIndex, nullChars);
-          output.put(currentIndex, datum);
+          Datum datum = fieldSerDer.deserialize(lineBuf, projected[currentTarget], currentIndex,
nullChars);
+          output.put(currentTarget, datum);
         } catch (Exception e) {
-          output.put(currentIndex, NullDatum.get());
+          output.put(currentTarget, NullDatum.get());
         }
 
         currentTarget++;
@@ -103,7 +115,7 @@ public class CSVLineDeserializer extends TextLineDeserializer {
     /* If a text row is less than table schema size, tuple should set to NullDatum */
     if (projection.length > currentTarget) {
       for (; currentTarget < projection.length; currentTarget++) {
-        output.put(projection[currentTarget], NullDatum.get());
+        output.put(currentTarget, NullDatum.get());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
index 988d5d1..4ebdbe8 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.storage.text;
 
 import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.storage.StorageConstants;
@@ -26,8 +27,8 @@ import org.apache.tajo.util.Bytes;
 
 public class CSVLineSerDe extends TextLineSerDe {
   @Override
-  public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes)
{
-    return new CSVLineDeserializer(schema, meta, targetColumnIndexes);
+  public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, Column []
projected) {
+    return new CSVLineDeserializer(schema, meta, projected);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 5e7bd94..55a2b96 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -275,7 +275,6 @@ public class DelimitedTextFile {
     private final long endOffset;
     /** The number of actual read records */
     private int recordCount = 0;
-    private int[] targetColumnIndexes;
 
     private DelimitedLineReader reader;
     private TextLineDeserializer deserializer;
@@ -321,13 +320,7 @@ public class DelimitedTextFile {
         targets = schema.toArray();
       }
 
-      targetColumnIndexes = new int[targets.length];
-      for (int i = 0; i < targets.length; i++) {
-        targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
-      }
-
       super.init();
-      Arrays.sort(targetColumnIndexes);
       if (LOG.isDebugEnabled()) {
         LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset
+ "," + endOffset);
       }
@@ -336,7 +329,7 @@ public class DelimitedTextFile {
         reader.readLine();  // skip first line;
       }
 
-      deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes);
+      deserializer = getLineSerde().createDeserializer(schema, meta, targets);
       deserializer.init();
     }
 
@@ -391,7 +384,7 @@ public class DelimitedTextFile {
             return EmptyTuple.get();
           }
 
-          tuple = new VTuple(schema.size());
+          tuple = new VTuple(targets.length);
           tuple.setOffset(offset);
 
           try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
index 89a7de9..f067cb3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@ -31,12 +31,10 @@ import java.io.IOException;
 public abstract class TextLineDeserializer {
   protected final Schema schema;
   protected final TableMeta meta;
-  protected final int[] targetColumnIndexes;
 
-  public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes)
{
+  public TextLineDeserializer(Schema schema, TableMeta meta) {
     this.schema = schema;
     this.meta = meta;
-    this.targetColumnIndexes = targetColumnIndexes;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
index 1a53bb0..c09a83b 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage.text;
 import io.netty.buffer.ByteBuf;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.datum.NullDatum;
@@ -36,7 +37,7 @@ public abstract class TextLineSerDe {
   public TextLineSerDe() {
   }
 
-  public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta,
int [] targetColumnIndexes);
+  public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta,
Column [] projected);
 
   public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 561e2ef..322818d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -175,7 +175,19 @@ public class TestMergeScanner {
     Tuple tuple;
     while ((tuple = scanner.next()) != null) {
       totalCounts++;
-      if (isProjectableStorage(meta.getStoreType())) {
+
+      if (storeType == StoreType.RAW) {
+        assertEquals(4, tuple.size());
+        assertNotNull(tuple.get(0));
+        assertNotNull(tuple.get(1));
+        assertNotNull(tuple.get(2));
+        assertNotNull(tuple.get(3));
+      } else if (scanner.isProjectable()) {
+        assertEquals(2, tuple.size());
+        assertNotNull(tuple.get(0));
+        assertNotNull(tuple.get(1));
+      } else {
+        assertEquals(4, tuple.size());
         assertNotNull(tuple.get(0));
         assertNull(tuple.get(1));
         assertNotNull(tuple.get(2));
@@ -189,14 +201,13 @@ public class TestMergeScanner {
 
   private static boolean isProjectableStorage(StoreType type) {
     switch (type) {
-      case RCFILE:
-      case PARQUET:
-      case SEQUENCEFILE:
-      case CSV:
-      case AVRO:
-        return true;
-      default:
-        return false;
+    case CSV:
+    case SEQUENCEFILE:
+    case RAW:
+    case ROWFILE:
+      return false;
+    default:
+      return true;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 456ea00..a735307 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -297,15 +297,7 @@ public class TestStorages {
     int tupleCnt = 0;
     Tuple tuple;
     while ((tuple = scanner.next()) != null) {
-      if (storeType == StoreType.RCFILE
-          || storeType == StoreType.CSV
-          || storeType == StoreType.PARQUET
-          || storeType == StoreType.SEQUENCEFILE
-          || storeType == StoreType.AVRO) {
-        assertTrue(tuple.get(0) == null);
-      }
-      assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
-      assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());
+      verifyProjectedFields(scanner.isProjectable(), tuple, tupleCnt);
       tupleCnt++;
     }
     scanner.close();
@@ -313,6 +305,20 @@ public class TestStorages {
     assertEquals(tupleNum, tupleCnt);
   }
 
+  private void verifyProjectedFields(boolean projectable, Tuple tuple, int tupleCnt) {
+    if (projectable) {
+      assertTrue(tupleCnt + 2 == tuple.get(0).asInt8());
+      assertTrue(tupleCnt + 3 == tuple.get(1).asFloat4());
+    } else {
+      // RAW and ROW always project all fields.
+      if (storeType != StoreType.RAW && storeType != StoreType.ROWFILE) {
+        assertTrue(tuple.get(0) == null);
+      }
+      assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
+      assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());
+    }
+  }
+
   @Test
   public void testVariousTypes() throws IOException {
     boolean handleProtobuf = storeType != StoreType.JSON;
@@ -956,7 +962,7 @@ public class TestStorages {
   @Test
   public void testLessThanSchemaSize() throws IOException {
     /* RAW is internal storage. It must be same with schema size */
-    if (storeType == StoreType.RAW || storeType == StoreType.AVRO){
+    if (storeType == StoreType.RAW || storeType == StoreType.AVRO || storeType == StoreType.PARQUET)
{
       return;
     }
 
@@ -1008,7 +1014,12 @@ public class TestStorages {
     Tuple tuple = scanner.next();
     scanner.close();
 
-    assertEquals(expect.get(1), tuple.get(1));
-    assertEquals(NullDatum.get(), tuple.get(4));
+    if (scanner.isProjectable()) {
+      assertEquals(expect.get(1), tuple.get(0));
+      assertEquals(NullDatum.get(), tuple.get(1));
+    } else {
+      assertEquals(expect.get(1), tuple.get(1));
+      assertEquals(NullDatum.get(), tuple.get(4));
+    }
   }
 }


Mime
View raw message