tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject git commit: TAJO-1045: Implement nextFetch(RowBlock) of RCFile scanner. (jinho)
Date Mon, 27 Oct 2014 02:25:14 GMT
Repository: tajo
Updated Branches:
  refs/heads/block_iteration 6fb2b151c -> cd114d2c8


TAJO-1045: Implement nextFetch(RowBlock) of RCFile scanner. (jinho)

Closes #205


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cd114d2c
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cd114d2c
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cd114d2c

Branch: refs/heads/block_iteration
Commit: cd114d2c8f9ead1277748bd00014b36ec9936749
Parents: 6fb2b15
Author: jhkim <jhkim@apache.org>
Authored: Mon Oct 27 11:24:22 2014 +0900
Committer: jhkim <jhkim@apache.org>
Committed: Mon Oct 27 11:24:22 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../java/org/apache/tajo/datum/Inet4Datum.java  | 12 +--
 .../org/apache/tajo/util/ReflectionUtil.java    | 25 +++++-
 .../storage/BinarySerializerDeserializer.java   | 63 +++++++++++++++
 .../tajo/storage/SerializerDeserializer.java    |  4 +
 .../storage/TextSerializerDeserializer.java     |  5 +-
 .../org/apache/tajo/storage/rcfile/RCFile.java  | 82 +++++++++++++++++---
 .../apache/tajo/storage/TestNextFetches.java    |  4 +-
 8 files changed, 173 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/cd114d2c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 3148d52..4f452af 100644
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,8 @@ Tajo Change Log
 
 Block Iteration - branch
 
+    TAJO-1045: Implement nextFetch(RowBlock) of RCFile scanner. (jinho)
+
     TAJO-1082: Implement nextFetch(RowBlock) of RawFileScanner. (jinho)
 
     TAJO-1044: Implement nextFetch(RowBlock) of Parquer scanner. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd114d2c/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java
index ed48a02..b293642 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Inet4Datum.java
@@ -42,17 +42,17 @@ public class Inet4Datum extends Datum {
 
 	public Inet4Datum(byte[] addr) {
     super(Type.INET4);
-		Preconditions.checkArgument(addr.length == size);
-    address = addr[3] & 0xFF
-        | ((addr[2] << 8) & 0xFF00)
-        | ((addr[1] << 16) & 0xFF0000)
-        | ((addr[0] << 24) & 0xFF000000);
+    address = readAsInt(addr, 0, addr.length);
   }
 
   public Inet4Datum(byte[] addr, int offset, int length) {
     super(Type.INET4);
+    address = readAsInt(addr, offset, length);
+  }
+
+  public static int readAsInt(byte[] addr, int offset, int length) {
     Preconditions.checkArgument(length == size);
-    address = addr[offset + 3] & 0xFF
+    return addr[offset + 3] & 0xFF
         | ((addr[offset + 2] << 8) & 0xFF00)
         | ((addr[offset + 1] << 16) & 0xFF0000)
         | ((addr[offset] << 24) & 0xFF000000);

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd114d2c/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
index 410815f..2971159 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
@@ -32,8 +32,25 @@ public class ReflectionUtil {
   private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE
=
       new ConcurrentHashMap<Class<?>, Constructor<?>>();
 
-	public static Object newInstance(Class<?> clazz) 
-			throws InstantiationException, IllegalAccessException {         
-		return clazz.newInstance();
-	}
+  /** Create an object for the given class and initialize it
+   *
+   * @param theClass class of which an object is created
+   * @return a new object
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T newInstance(Class<T> theClass) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance();
+    } catch (Throwable e) {
+      throw new RuntimeException(e);
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd114d2c/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
index 609a3df..543d891 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.Message;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.datum.*;
+import org.apache.tajo.tuple.offheap.RowWriter;
 import org.apache.tajo.util.Bytes;
 
 import java.io.IOException;
@@ -157,6 +158,68 @@ public class BinarySerializerDeserializer implements SerializerDeserializer
{
     return datum;
   }
 
+  public void write(RowWriter writer, Column col, byte [] bytes, int offset, int length,
byte [] nullChar)
+      throws IOException {
+
+    if (length == 0) {
+      writer.skipField();
+      return;
+    } else {
+      switch (col.getDataType().getType()) {
+        case BOOLEAN:
+          writer.putBool(BooleanDatum.TRUE_INT == bytes[offset]);
+          break;
+        case BIT:
+          writer.putByte(bytes[offset]);
+          break;
+        case CHAR:
+        case TEXT:
+          if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, 0, INVALID_UTF__SINGLE_BYTE.length,
bytes, offset, length)) {
+            writer.putText(new byte[0]);
+          } else {
+            writer.putText(bytes, offset, length);
+          }
+          break;
+
+        case INT1:
+        case INT2:
+          writer.putInt2(Bytes.toShort(bytes, offset, length));
+          break;
+
+        case INT4:
+          writer.putInt4((int) Bytes.readVLong(bytes, offset));
+          break;
+
+        case INT8:
+          writer.putInt8(Bytes.readVLong(bytes, offset));
+          break;
+
+        case FLOAT4:
+          writer.putFloat4(toFloat(bytes, offset, length));
+          break;
+
+        case FLOAT8:
+          writer.putFloat8(toDouble(bytes, offset, length));
+          break;
+
+        case PROTOBUF:
+          writer.putBlob(bytes, offset, length);
+          break;
+
+        case INET4:
+          writer.putInet4(Inet4Datum.readAsInt(bytes, offset, length));
+          break;
+
+        case BLOB:
+          writer.putBlob(bytes, offset, length);
+          break;
+
+        default:
+          writer.skipField();
+      }
+    }
+  }
+
   private byte[] shortBytes = new byte[2];
 
   public int writeShort(OutputStream out, short val) throws IOException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd114d2c/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
index 333f205..2d0f8ba 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.tajo.storage;
 
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.tuple.offheap.RowWriter;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -31,4 +32,7 @@ public interface SerializerDeserializer {
 
   public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters)
throws IOException;
 
+  public void write(RowWriter writer, Column col, byte[] bytes, int offset, int length, byte[]
nullCharacters)
+      throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd114d2c/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
index 6dfe6c1..91a2d86 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -216,7 +216,8 @@ public class TextSerializerDeserializer implements SerializerDeserializer
{
     return datum;
   }
 
-  public static void write(RowWriter writer, Column col, byte [] bytes, int offset, int length,
byte [] nullChar) throws IOException {
+  public void write(RowWriter writer, Column col, byte [] bytes, int offset, int length,
byte [] nullChar)
+      throws IOException {
     TajoDataTypes.Type type = col.getDataType().getType();
     boolean nullField;
     if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) {
@@ -236,7 +237,7 @@ public class TextSerializerDeserializer implements SerializerDeserializer
{
 
       case CHAR:
       case TEXT:
-        writer.putText(bytes);
+        writer.putText(bytes, offset, length);
         break;
 
       case INT1:

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd114d2c/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index e5507ad..3641398 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -39,6 +39,9 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlockWriter;
+import org.apache.tajo.util.ReflectionUtil;
 
 import java.io.Closeable;
 import java.io.*;
@@ -738,7 +741,7 @@ public class RCFile {
         try {
           Class<? extends CompressionCodec> codecClass = conf.getClassByName(
               codecClassname).asSubclass(CompressionCodec.class);
-          codec = ReflectionUtils.newInstance(codecClass, conf);
+          codec = ReflectionUtil.newInstance(codecClass);
         } catch (ClassNotFoundException cnfe) {
           throw new IllegalArgumentException(
               "Unknown codec: " + codecClassname, cnfe);
@@ -762,7 +765,7 @@ public class RCFile {
       String serdeClass = this.meta.getOption(StorageConstants.RCFILE_SERDE,
           BinarySerializerDeserializer.class.getName());
       try {
-        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+        serde = (SerializerDeserializer) ReflectionUtil.newInstance(Class.forName(serdeClass));
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         throw new IOException(e);
@@ -1373,7 +1376,7 @@ public class RCFile {
         } else{
           serdeClass = this.meta.getOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
         }
-        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+        serde = (SerializerDeserializer) ReflectionUtil.newInstance(Class.forName(serdeClass));
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         throw new IOException(e);
@@ -1620,24 +1623,38 @@ public class RCFile {
 
     @Override
     public Tuple next() throws IOException {
+      if(!hasNext()) return null;
+
+      Tuple tuple = new VTuple(schema.size());
+      getCurrentRow(tuple);
+      return tuple;
+    }
+
+    public boolean hasNext() throws IOException {
       if (!more) {
-        return null;
+        return false;
       }
 
       more = nextBuffer(rowId);
       long lastSeenSyncPos = lastSeenSyncPos();
       if (lastSeenSyncPos >= endOffset) {
         more = false;
-        return null;
+        return more;
       }
 
-      if (!more) {
-        return null;
+      return more;
+    }
+
+    @Override
+    public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+      rowBlock.clear();
+
+      OffHeapRowBlockWriter writer = (OffHeapRowBlockWriter) rowBlock.getWriter();
+      while(rowBlock.rows() < rowBlock.maxRowNum() && hasNext()) {
+        getCurrentRowBlock(writer);
       }
 
-      Tuple tuple = new VTuple(schema.size());
-      getCurrentRow(tuple);
-      return tuple;
+      return rowBlock.rows() > 0;
     }
 
     @Override
@@ -1722,6 +1739,51 @@ public class RCFile {
     }
 
     /**
+     * get the current row used,make sure called {@link #nextFetch(OffHeapRowBlock rowBlock)}
+     * first.
+     *
+     * @throws IOException
+     */
+    public void getCurrentRowBlock(OffHeapRowBlockWriter writer) throws IOException {
+      if (!keyInit || rowFetched) {
+        return;
+      }
+
+      if (!currentValue.inited) {
+        currentValueBuffer();
+      }
+      writer.startRow();
+      int currentSchemaIndex = 0;
+      for (int j = 0; j < selectedColumns.length; ++j) {
+        SelectedColumn col = selectedColumns[j];
+        int i = col.colIndex;
+
+        while(i > currentSchemaIndex){
+          writer.skipField();
+          currentSchemaIndex++;
+        }
+
+        if (col.isNulled) {
+          writer.skipField();
+        } else {
+          colAdvanceRow(j, col);
+
+          serde.write(writer, schema.getColumn(i),
+              currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength,
nullChars);
+          col.rowReadIndex += col.prvLength;
+        }
+        currentSchemaIndex++;
+      }
+
+      while(schema.size() > currentSchemaIndex){
+        writer.skipField();
+        currentSchemaIndex++;
+      }
+      writer.endRow();
+      rowFetched = true;
+    }
+
+    /**
      * Advance column state to the next now: update offsets, run lengths etc
      *
      * @param selCol - index among selectedColumns

http://git-wip-us.apache.org/repos/asf/tajo/blob/cd114d2c/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
index 945ed71..48a8a68 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
@@ -151,7 +151,7 @@ public class TestNextFetches {
         DatumFactory.createFloat4(77.9f),
         DatumFactory.createFloat8(271.9f),
         DatumFactory.createText("hyunsik"),
-        DatumFactory.createBlob("hyunsik baaaabo".getBytes()),
+        DatumFactory.createBlob("emiya muljomdao".getBytes()),
         DatumFactory.createInet4("192.168.0.1"),
         NullDatum.get(),
         factory.createDatum(queryid.getProto())
@@ -164,7 +164,7 @@ public class TestNextFetches {
         {StoreType.CSV, true, true, true},
         // TODO - to be implemented
         {StoreType.RAW, false, true, true},
-//        {StoreType.RCFILE, true, true, false},
+        {StoreType.RCFILE, true, true, false},
         {StoreType.BLOCK_PARQUET, false, false, false},
 //        {StoreType.SEQUENCEFILE, true, true, false},
 //        {StoreType.AVRO, false, false, false},


Mime
View raw message