tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blrun...@apache.org
Subject tajo git commit: TAJO-2052: Upgrading ORC reader version.
Date Mon, 01 Feb 2016 01:56:56 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 6debe7f8a -> 6717e3d1f


TAJO-2052: Upgrading ORC reader version.

Closes #937

Signed-off-by: JaeHwa Jung <blrunner@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6717e3d1
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6717e3d1
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6717e3d1

Branch: refs/heads/master
Commit: 6717e3d1f4ff234bbb99da3224978693355a6221
Parents: 6debe7f
Author: Jongyoung Park <eminency@gmail.com>
Authored: Mon Feb 1 10:49:14 2016 +0900
Committer: JaeHwa Jung <blrunner@apache.org>
Committed: Mon Feb 1 10:49:14 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../apache/tajo/storage/StorageConstants.java   |   2 +
 .../src/main/sphinx/table_management/orc.rst    |   1 +
 tajo-storage/tajo-storage-hdfs/pom.xml          |   2 +-
 .../org/apache/tajo/storage/orc/ORCScanner.java | 190 +++++++++----------
 .../TajoStructObjectInspector.java              |   5 +
 .../thirdparty/orc/FileOrcDataSource.java       | 132 -------------
 .../thirdparty/orc/HdfsOrcDataSource.java       |  24 +--
 8 files changed, 113 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 11fabf1..bd19aae 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-2052: Upgrading ORC reader version. (Jongyoung Park via jaehwa)
+    
     TAJO-1940: Implement HBaseTablespace::getTableVolume() method. (hyunsik)
 
     TAJO-2061: Add description for EXPLAIN statement. (jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index d7f1ec5..097963c 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -82,6 +82,8 @@ public class StorageConstants {
   // ORC file properties -------------------------------------------------
   public static final String ORC_MAX_MERGE_DISTANCE = "orc.max.merge.distance";
   public static final String DEFAULT_ORC_MAX_MERGE_DISTANCE = "1048576";  // 1MB
+  public static final String ORC_MAX_READ_BUFFER_SIZE = "orc.max.read.buffer";
+  public static final String DEFAULT_ORC_MAX_READ_BUFFER_SIZE = "8388608";  // 8MB
 
   public static final String ORC_STRIPE_SIZE = "orc.stripe.size";
   public static final String DEFAULT_ORC_STRIPE_SIZE = "67108864"; // 64MB

http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-docs/src/main/sphinx/table_management/orc.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/table_management/orc.rst b/tajo-docs/src/main/sphinx/table_management/orc.rst
index 2733afc..eb84b20 100644
--- a/tajo-docs/src/main/sphinx/table_management/orc.rst
+++ b/tajo-docs/src/main/sphinx/table_management/orc.rst
@@ -34,6 +34,7 @@ The ``WITH`` clause in the CREATE TABLE statement allows users to set those
para
 Now, ORC file provides the following physical properties.
 
 * ``orc.max.merge.distance``: When ORC file is read, if stripes are too closer and the distance
is lower than this value, they are merged and read at once. Default is 1MB.
+* ``orc.max.read.buffer``: When ORC file is read, it defines maximum read buffer size. That
is, it can be maximum size of a single read. Default is 8MB.
 * ``orc.stripe.size``: It decides size of each stripe. Default is 64MB.
 * ``orc.compression.kind``: It means the compression algorithm used to compress and write
data. It should be one of ``none``, ``snappy``, ``zlib``. Default is ``none``.
 * ``orc.buffer.size``: It decides size of writing buffer. Default is 256KB.

http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-storage/tajo-storage-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml
index 3b89e1c..6c10a88 100644
--- a/tajo-storage/tajo-storage-hdfs/pom.xml
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -358,7 +358,7 @@
     <dependency>
       <groupId>com.facebook.presto</groupId>
       <artifactId>presto-orc</artifactId>
-      <version>0.86</version>
+      <version>0.132</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
index 9351c59..0a4ebc6 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
@@ -18,7 +18,16 @@
 
 package org.apache.tajo.storage.orc;
 
+import com.facebook.presto.orc.OrcDataSource;
+import com.facebook.presto.orc.OrcPredicate;
+import com.facebook.presto.orc.OrcReader;
+import com.facebook.presto.orc.OrcRecordReader;
+import com.facebook.presto.orc.memory.AggregatedMemoryContext;
+import com.facebook.presto.orc.metadata.OrcMetadataReader;
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.type.*;
 import com.google.protobuf.InvalidProtocolBufferException;
+import io.airlift.units.DataSize;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,15 +48,13 @@ import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.storage.fragment.Fragment;
-import com.facebook.presto.orc.*;
-import com.facebook.presto.orc.metadata.OrcMetadataReader;
 import org.apache.tajo.storage.thirdparty.orc.HdfsOrcDataSource;
 import org.apache.tajo.util.datetime.DateTimeUtil;
 import org.joda.time.DateTimeZone;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.TimeZone;
 
 /**
@@ -56,42 +63,16 @@ import java.util.TimeZone;
 public class ORCScanner extends FileScanner {
   private static final Log LOG = LogFactory.getLog(ORCScanner.class);
   private OrcRecordReader recordReader;
-  private Vector [] vectors;
+  private Block[] blocks;
   private int currentPosInBatch = 0;
   private int batchSize = 0;
   private Tuple outTuple;
+  private AggregatedMemoryContext aggrMemoryContext = new AggregatedMemoryContext();
 
   public ORCScanner(Configuration conf, final Schema schema, final TableMeta meta, final
Fragment fragment) {
     super(conf, schema, meta, fragment);
   }
 
-  private Vector createOrcVector(TajoDataTypes.DataType type) {
-    switch (type.getType()) {
-      case INT1: case INT2: case INT4: case INT8:
-      case INET4:
-      case TIMESTAMP:
-      case DATE:
-        return new LongVector();
-
-      case FLOAT4:
-      case FLOAT8:
-        return new DoubleVector();
-
-      case BOOLEAN:
-      case NULL_TYPE:
-        return new BooleanVector();
-
-      case BLOB:
-      case TEXT:
-      case CHAR:
-      case PROTOBUF:
-        return new SliceVector();
-
-      default:
-        throw new TajoRuntimeException(new NotImplementedException(type.getType().name()
+ " for orc"));
-    }
-  }
-
   private FileSystem fs;
   private FSDataInputStream fis;
 
@@ -108,6 +89,10 @@ public class ORCScanner extends FileScanner {
   @Override
   public void init() throws IOException {
     OrcReader orcReader;
+    DataSize maxMergeDistance = new DataSize(Double.parseDouble(meta.getProperty(StorageConstants.ORC_MAX_MERGE_DISTANCE,
+            StorageConstants.DEFAULT_ORC_MAX_MERGE_DISTANCE)), DataSize.Unit.BYTE);
+    DataSize maxReadSize = new DataSize(Double.parseDouble(meta.getProperty(StorageConstants.ORC_MAX_READ_BUFFER_SIZE,
+        StorageConstants.DEFAULT_ORC_MAX_READ_BUFFER_SIZE)), DataSize.Unit.BYTE);
 
     if (targets == null) {
       targets = schema.toArray();
@@ -129,8 +114,8 @@ public class ORCScanner extends FileScanner {
         this.fragment.getPath().toString(),
         fis,
         fs.getFileStatus(path).getLen(),
-        Integer.parseInt(meta.getProperty(StorageConstants.ORC_MAX_MERGE_DISTANCE,
-          StorageConstants.DEFAULT_ORC_MAX_MERGE_DISTANCE)));
+        maxMergeDistance,
+        maxReadSize);
 
     targetColInfo = new ColumnInfo[targets.length];
     for (int i=0; i<targets.length; i++) {
@@ -140,26 +125,23 @@ public class ORCScanner extends FileScanner {
       targetColInfo[i] = cinfo;
     }
 
-    // creating vectors for buffering
-    vectors = new Vector[targetColInfo.length];
-    for (int i=0; i<targetColInfo.length; i++) {
-      vectors[i] = createOrcVector(targetColInfo[i].type);
-    }
+    // creating blocks for buffering
+    blocks = new Block[targetColInfo.length];
 
-    Set<Integer> columnSet = new HashSet<>();
+    Map<Integer, Type> columnMap = new HashMap<>();
     for (ColumnInfo colInfo: targetColInfo) {
-      columnSet.add(colInfo.id);
+      columnMap.put(colInfo.id, createFBtypeByTajoType(colInfo.type));
     }
 
-    orcReader = new OrcReader(orcDataSource, new OrcMetadataReader());
+    orcReader = new OrcReader(orcDataSource, new OrcMetadataReader(), maxMergeDistance, maxReadSize);
 
     TimeZone timezone = TimeZone.getTimeZone(meta.getProperty(StorageConstants.TIMEZONE,
       TajoConstants.DEFAULT_SYSTEM_TIMEZONE));
 
     // TODO: make OrcPredicate useful
     // presto-orc uses joda timezone, so it needs to be converted.
-    recordReader = orcReader.createRecordReader(columnSet, OrcPredicate.TRUE,
-        fragment.getStartKey(), fragment.getLength(), DateTimeZone.forTimeZone(timezone));
+    recordReader = orcReader.createRecordReader(columnMap, OrcPredicate.TRUE,
+        fragment.getStartKey(), fragment.getLength(), DateTimeZone.forTimeZone(timezone),
aggrMemoryContext);
 
     super.init();
     LOG.debug("file fragment { path: " + fragment.getPath() +
@@ -179,7 +161,7 @@ public class ORCScanner extends FileScanner {
     }
 
     for (int i=0; i<targetColInfo.length; i++) {
-      outTuple.put(i, createValueDatum(vectors[i], targetColInfo[i].type));
+      outTuple.put(i, createValueDatum(blocks[i], targetColInfo[i].type));
     }
 
     currentPosInBatch++;
@@ -187,95 +169,101 @@ public class ORCScanner extends FileScanner {
     return outTuple;
   }
 
+  private Type createFBtypeByTajoType(TajoDataTypes.DataType type) {
+    switch(type.getType()) {
+      case BOOLEAN:
+        return BooleanType.BOOLEAN;
+
+      case INT1:
+      case INT2:
+      case INT4:
+      case INT8:
+      case INET4:
+      case NULL_TYPE: // meaningless
+        return BigintType.BIGINT;
+
+      case TIMESTAMP:
+        return TimestampType.TIMESTAMP;
+
+      case DATE:
+        return DateType.DATE;
+
+      case FLOAT4:
+      case FLOAT8:
+        return DoubleType.DOUBLE;
+
+      case CHAR:
+      case TEXT:
+        return VarcharType.VARCHAR;
+
+      case BLOB:
+      case PROTOBUF:
+        return VarbinaryType.VARBINARY;
+
+      default:
+        throw new TajoRuntimeException(new NotImplementedException(type.getType().name()
+ " for orc"));
+    }
+  }
+
   // TODO: support more types
-  private Datum createValueDatum(Vector vector, TajoDataTypes.DataType type) {
+  private Datum createValueDatum(Block block, TajoDataTypes.DataType type) {
+    if (block.isNull(currentPosInBatch))
+      return NullDatum.get();
+
+    // NOTE: block.get***() methods are determined by the type size wich is in createFBtypeByTajoType()
     switch (type.getType()) {
       case INT1:
-      case INT2:
-        if (((LongVector) vector).isNull[currentPosInBatch])
-          return NullDatum.get();
+        return DatumFactory.createInt2((short)block.getLong(currentPosInBatch, 0));
 
-        return DatumFactory.createInt2((short) ((LongVector) vector).vector[currentPosInBatch]);
+      case INT2:
+        return DatumFactory.createInt2((short)block.getLong(currentPosInBatch, 0));
 
       case INT4:
-        if (((LongVector) vector).isNull[currentPosInBatch])
-          return NullDatum.get();
-
-        return DatumFactory.createInt4((int) ((LongVector) vector).vector[currentPosInBatch]);
+        return DatumFactory.createInt4((int)block.getLong(currentPosInBatch, 0));
 
       case INT8:
-        if (((LongVector) vector).isNull[currentPosInBatch])
-          return NullDatum.get();
-
-        return DatumFactory.createInt8(((LongVector) vector).vector[currentPosInBatch]);
+        return DatumFactory.createInt8(block.getLong(currentPosInBatch, 0));
 
       case FLOAT4:
-        if (((DoubleVector) vector).isNull[currentPosInBatch])
-          return NullDatum.get();
-
-        return DatumFactory.createFloat4((float) ((DoubleVector) vector).vector[currentPosInBatch]);
+        return DatumFactory.createFloat4((float)block.getDouble(currentPosInBatch, 0));
 
       case FLOAT8:
-        if (((DoubleVector) vector).isNull[currentPosInBatch])
-          return NullDatum.get();
-
-        return DatumFactory.createFloat8(((DoubleVector) vector).vector[currentPosInBatch]);
+        return DatumFactory.createFloat8(block.getDouble(currentPosInBatch, 0));
 
       case BOOLEAN:
-        if (((BooleanVector) vector).isNull[currentPosInBatch])
-          return NullDatum.get();
-
-        return ((BooleanVector) vector).vector[currentPosInBatch] ? BooleanDatum.TRUE : BooleanDatum.FALSE;
+        return DatumFactory.createBool(block.getByte(currentPosInBatch, 0) != 0);
 
       case CHAR:
-        if (((SliceVector) vector).vector[currentPosInBatch] == null)
-          return NullDatum.get();
-
-        return DatumFactory.createChar(((SliceVector) vector).vector[currentPosInBatch].toStringUtf8());
+        return DatumFactory.createChar(block.getSlice(currentPosInBatch, 0,
+            block.getLength(currentPosInBatch)).getBytes());
 
       case TEXT:
-        if (((SliceVector) vector).vector[currentPosInBatch] == null)
-          return NullDatum.get();
-
-        return DatumFactory.createText(((SliceVector) vector).vector[currentPosInBatch].getBytes());
+        return DatumFactory.createText(block.getSlice(currentPosInBatch, 0,
+            block.getLength(currentPosInBatch)).getBytes());
 
       case BLOB:
-        if (((SliceVector) vector).vector[currentPosInBatch] == null)
-          return NullDatum.get();
-
-        return DatumFactory.createBlob(((SliceVector) vector).vector[currentPosInBatch].getBytes());
+        return DatumFactory.createBlob(block.getSlice(currentPosInBatch, 0,
+            block.getLength(currentPosInBatch)).getBytes());
 
       case PROTOBUF:
         try {
-          if (((SliceVector) vector).vector[currentPosInBatch] == null)
-            return NullDatum.get();
-
-          return ProtobufDatumFactory.createDatum(type,
-            ((SliceVector) vector).vector[currentPosInBatch].getBytes());
+          return ProtobufDatumFactory.createDatum(type, block.getSlice(currentPosInBatch,
0,
+              block.getLength(currentPosInBatch)).getBytes());
         } catch (InvalidProtocolBufferException e) {
           LOG.error("ERROR", e);
           return NullDatum.get();
         }
 
       case TIMESTAMP:
-        if (((LongVector) vector).isNull[currentPosInBatch])
-          return NullDatum.get();
-
         return DatumFactory.createTimestamp(
-          DateTimeUtil.javaTimeToJulianTime(((LongVector) vector).vector[currentPosInBatch]));
+            DateTimeUtil.javaTimeToJulianTime(block.getLong(currentPosInBatch, 0)));
 
       case DATE:
-        if (((LongVector) vector).isNull[currentPosInBatch])
-          return NullDatum.get();
-
         return DatumFactory.createDate(
-          (int) ((LongVector) vector).vector[currentPosInBatch] + DateTimeUtil.DAYS_FROM_JULIAN_TO_EPOCH);
+            block.getInt(currentPosInBatch, 0) + DateTimeUtil.DAYS_FROM_JULIAN_TO_EPOCH);
 
       case INET4:
-        if (((LongVector) vector).isNull[currentPosInBatch])
-          return NullDatum.get();
-
-        return DatumFactory.createInet4((int) ((LongVector) vector).vector[currentPosInBatch]);
+        return DatumFactory.createInet4((int)block.getLong(currentPosInBatch, 0));
 
       case NULL_TYPE:
         return NullDatum.get();
@@ -286,7 +274,7 @@ public class ORCScanner extends FileScanner {
   }
 
   /**
-   * Fetch next batch from ORC file to vectors as many as batch size
+   * Fetch next batch from ORC file and write to block data structure as many as batch size
    *
    * @throws IOException
    */
@@ -298,7 +286,7 @@ public class ORCScanner extends FileScanner {
       return;
 
     for (int i=0; i<targetColInfo.length; i++) {
-      recordReader.readVector(targetColInfo[i].id, vectors[i]);
+      blocks[i] = recordReader.readBlock(createFBtypeByTajoType(targetColInfo[i].type), targetColInfo[i].id);
     }
 
     currentPosInBatch = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java
index e85913e..7521fa3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java
@@ -60,6 +60,11 @@ public class TajoStructObjectInspector extends StructObjectInspector {
     }
 
     @Override
+    public int getFieldID() {
+      return 0;
+    }
+
+    @Override
     public String getFieldComment() {
       return comment;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java
deleted file mode 100644
index de7c802..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import com.facebook.presto.orc.DiskRange;
-import com.facebook.presto.orc.OrcDataSource;
-import com.google.common.collect.ImmutableMap;
-import io.airlift.slice.Slice;
-import io.airlift.units.DataSize;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice;
-import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
-
-/**
- * File data source class for Orc Reader
- *
- * Most of code is from Presto
- */
-public class FileOrcDataSource
-        implements OrcDataSource
-{
-    private final File path;
-    private final long size;
-    private final RandomAccessFile input;
-    private final DataSize maxMergeDistance;
-    private long readTimeNanos;
-
-    public FileOrcDataSource(File path, double mergeDistance)
-            throws IOException
-    {
-        this.path = checkNotNull(path, "path is null");
-        this.size = path.length();
-        this.input = new RandomAccessFile(path, "r");
-
-        maxMergeDistance = new DataSize(mergeDistance, DataSize.Unit.BYTE);
-    }
-
-    @Override
-    public void close()
-            throws IOException
-    {
-        input.close();
-    }
-
-    @Override
-    public long getReadTimeNanos()
-    {
-        return readTimeNanos;
-    }
-
-    @Override
-    public long getSize()
-    {
-        return size;
-    }
-
-    @Override
-    public void readFully(long position, byte[] buffer)
-            throws IOException
-    {
-        readFully(position, buffer, 0, buffer.length);
-    }
-
-    @Override
-    public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength)
-            throws IOException
-    {
-        long start = System.nanoTime();
-
-        input.seek(position);
-        input.readFully(buffer, bufferOffset, bufferLength);
-
-        readTimeNanos += System.nanoTime() - start;
-    }
-
-    @Override
-    public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges)
-            throws IOException
-    {
-        checkNotNull(diskRanges, "diskRanges is null");
-
-        if (diskRanges.isEmpty()) {
-            return ImmutableMap.of();
-        }
-
-        // TODO: benchmark alternatively strategies:
-        // 1) sort ranges and perform one read per range
-        // 2) single read with transferTo() using custom WritableByteChannel
-
-        Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(),
maxMergeDistance);
-
-        // read ranges
-        Map<DiskRange, byte[]> buffers = new LinkedHashMap<>();
-        for (DiskRange mergedRange : mergedRanges) {
-            // read full range in one request
-            byte[] buffer = new byte[mergedRange.getLength()];
-            readFully(mergedRange.getOffset(), buffer);
-            buffers.put(mergedRange, buffer);
-        }
-
-        ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder();
-        for (Entry<K, DiskRange> entry : diskRanges.entrySet()) {
-            slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers));
-        }
-        return slices.build();
-    }
-
-    @Override
-    public String toString()
-    {
-        return path.getPath();
-    }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
index da12461..5357f51 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
@@ -17,7 +17,8 @@ package org.apache.tajo.storage.thirdparty.orc;
 import com.facebook.presto.orc.DiskRange;
 import com.facebook.presto.orc.OrcDataSource;
 import com.google.common.collect.ImmutableMap;
-import io.airlift.slice.Slice;
+import io.airlift.slice.BasicSliceInput;
+import io.airlift.slice.FixedLengthSliceInput;
 import io.airlift.units.DataSize;
 import org.apache.hadoop.fs.FSDataInputStream;
 
@@ -43,17 +44,19 @@ public class HdfsOrcDataSource
   private final String path;
   private final long size;
   private final DataSize maxMergeDistance;
+  private final DataSize maxReadSize;
   private long readTimeNanos;
 
-  public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size, double
maxMergeDistance)
+  public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size,
+                           DataSize maxMergeDistance, DataSize maxReadSize)
   {
     this.path = checkNotNull(path, "path is null");
     this.inputStream = checkNotNull(inputStream, "inputStream is null");
     this.size = size;
     checkArgument(size >= 0, "size is negative");
 
-    DataSize mergeDistance = new DataSize(maxMergeDistance, DataSize.Unit.BYTE);
-    this.maxMergeDistance = checkNotNull(mergeDistance, "maxMergeDistance is null");
+    this.maxMergeDistance = checkNotNull(maxMergeDistance, "maxMergeDistance is null");
+    this.maxReadSize = checkNotNull(maxReadSize, "maxMergeDistance is null");
   }
 
   @Override
@@ -89,12 +92,11 @@ public class HdfsOrcDataSource
     long start = System.nanoTime();
 
     inputStream.readFully(position, buffer, bufferOffset, bufferLength);
-
     readTimeNanos += System.nanoTime() - start;
   }
 
   @Override
-  public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges)
+  public <K> Map<K, FixedLengthSliceInput> readFully(Map<K, DiskRange>
diskRanges)
     throws IOException
   {
     checkNotNull(diskRanges, "diskRanges is null");
@@ -103,7 +105,7 @@ public class HdfsOrcDataSource
       return ImmutableMap.of();
     }
 
-    Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(),
maxMergeDistance);
+    Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(),
maxMergeDistance, maxReadSize);
 
     // read ranges
     Map<DiskRange, byte[]> buffers = new LinkedHashMap<>();
@@ -114,10 +116,10 @@ public class HdfsOrcDataSource
       buffers.put(mergedRange, buffer);
     }
 
-    ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder();
-    for (Entry<K, DiskRange> entry : diskRanges.entrySet()) {
-      slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers));
-    }
+    ImmutableMap.Builder<K, FixedLengthSliceInput> slices = ImmutableMap.builder();
+    diskRanges.forEach((K key, DiskRange range) ->
+        slices.put(key, new BasicSliceInput(getDiskRangeSlice(range, buffers))));
+
     return slices.build();
   }
 


Mime
View raw message