tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [14/15] tajo git commit: TAJO-1152: RawFile ByteBuffer should be reuse. (jinho)
Date Wed, 12 Nov 2014 06:40:58 GMT
TAJO-1152: RawFile ByteBuffer should be reuse. (jinho)

Closes #224


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/55084a8a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/55084a8a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/55084a8a

Branch: refs/heads/hbase_storage
Commit: 55084a8ae29a3650409c1f1dea68223d6f1c340d
Parents: 0055568
Author: jhkim <jhkim@apache.org>
Authored: Wed Nov 12 11:31:33 2014 +0900
Committer: jhkim <jhkim@apache.org>
Committed: Wed Nov 12 11:31:33 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/storage/RawFile.java   | 184 ++++++++++---------
 .../org/apache/tajo/storage/TestStorages.java   |  16 +-
 3 files changed, 108 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/55084a8a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 1337f87..7f5ec76 100644
--- a/CHANGES
+++ b/CHANGES
@@ -75,6 +75,8 @@ Release 0.9.1 - unreleased
 
   SUB TASKS
 
+    TAJO-1152: RawFile ByteBuffer should be reuse. (jinho)
+
     TAJO-1149: Implement direct read of DelimitedTextFile. (jinho)
 
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/55084a8a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index c8ac3a2..2fae243 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.storage;
 
 import com.google.protobuf.Message;
-
+import io.netty.buffer.ByteBuf;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -51,17 +51,20 @@ public class RawFile {
     private DataType[] columnTypes;
 
     private ByteBuffer buffer;
-    private int bufferSize;
+    private ByteBuf buf;
     private Tuple tuple;
 
     private int headerSize = 0; // Header size of a tuple
     private BitArray nullFlags;
     private static final int RECORD_SIZE = 4;
-    private boolean eof = false;
-    private long fileLimit; // If this.fragment represents a complete file, this value is
equal to the file's size
-    private long numBytesRead;
+    private boolean eos = false;
+    private long startOffset;
+    private long endOffset;
     private FileInputStream fis;
     private long recordCount;
+    private long totalReadBytes;
+    private long filePosition;
+    private boolean forceFillBuffer;
 
     public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment
fragment) throws IOException {
       super(conf, schema, meta, fragment);
@@ -81,22 +84,16 @@ public class RawFile {
 
       fis = new FileInputStream(file);
       channel = fis.getChannel();
-      fileLimit = fragment.getStartKey() + fragment.getEndKey(); // fileLimit is less than
or equal to fileSize
+      filePosition = startOffset = fragment.getStartKey();
+      endOffset = fragment.getStartKey() + fragment.getEndKey();
 
-      if (tableStats != null) {
-        tableStats.setNumBytes(fragment.getEndKey());
-      }
       if (LOG.isDebugEnabled()) {
-        LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", total
file size :" + channel.size()
-            + ", fragment size :" + fragment.getEndKey() + ", fileLimit: " + fileLimit);
+        LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file
size :" + channel.size()
+            + ", fragment length :" + fragment.getEndKey());
       }
 
-      if (fragment.getEndKey() < 64 * StorageUnit.KB) {
-	      bufferSize = fragment.getEndKey().intValue();
-      } else {
-	      bufferSize = 64 * StorageUnit.KB;
-      }
-      buffer = ByteBuffer.allocateDirect(bufferSize);
+      buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+      buffer = buf.nioBuffer(0, buf.capacity());
 
       columnTypes = new DataType[schema.size()];
       for (int i = 0; i < schema.size(); i++) {
@@ -107,58 +104,53 @@ public class RawFile {
       nullFlags = new BitArray(schema.size());
       headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for
NullFlagSize
 
-      // initial read
+      // initial set position
       if (fragment.getStartKey() > 0) {
-	channel.position(fragment.getStartKey());
+        channel.position(fragment.getStartKey());
       }
-      numBytesRead = channel.read(buffer);
-      buffer.flip();
 
+      forceFillBuffer = true;
       super.init();
     }
 
     @Override
     public long getNextOffset() throws IOException {
-      return channel.position() - buffer.remaining();
+      return filePosition - (forceFillBuffer ? 0 : buffer.remaining());
     }
 
     @Override
     public void seek(long offset) throws IOException {
-      long currentPos = channel.position();
-      if(currentPos < offset &&  offset < currentPos + buffer.limit()){
-        buffer.position((int)(offset - currentPos));
+      eos = false;
+      filePosition = channel.position();
+
+      // do not fill the buffer if the offset is already included in the buffer.
+      if(!forceFillBuffer && filePosition > offset && offset > filePosition
- buffer.limit()){
+        buffer.position((int)(offset - (filePosition - buffer.limit())));
       } else {
-        buffer.clear();
+        if(offset < startOffset || offset > startOffset + fragment.getEndKey()){
+          throw new IndexOutOfBoundsException(String.format("range(%d, %d), offset: %d",
+              startOffset, startOffset + fragment.getEndKey(), offset));
+        }
         channel.position(offset);
-        int bytesRead = channel.read(buffer);
-        numBytesRead = bytesRead;
-        buffer.flip();
-        eof = false;
+        filePosition = offset;
+        buffer.clear();
+        forceFillBuffer = true;
+        fillBuffer();
       }
     }
 
     private boolean fillBuffer() throws IOException {
-      if (numBytesRead >= fragment.getEndKey()) {
-        eof = true;
-        return false;
-      }
-      int currentDataSize = buffer.remaining();
-      buffer.compact();
+      if(!forceFillBuffer) buffer.compact();
+
       int bytesRead = channel.read(buffer);
+      forceFillBuffer = false;
       if (bytesRead == -1) {
-        eof = true;
+        eos = true;
         return false;
       } else {
-        buffer.flip();
-        long realRemaining = fragment.getEndKey() - numBytesRead;
-        numBytesRead += bytesRead;
-        if (realRemaining < bufferSize) {
-          int newLimit = currentDataSize + (int) realRemaining;
-          if(newLimit > bufferSize) {
-            newLimit = bufferSize;
-          }
-          buffer.limit(newLimit);
-        }
+        buffer.flip(); //The limit is set to the current filePosition and then the filePosition
is set to zero
+        filePosition += bytesRead;
+        totalReadBytes += bytesRead;
         return true;
       }
     }
@@ -247,9 +239,9 @@ public class RawFile {
 
     @Override
     public Tuple next() throws IOException {
-      if(eof) return null;
+      if(eos) return null;
 
-      if (buffer.remaining() < headerSize) {
+      if (forceFillBuffer || buffer.remaining() < headerSize) {
         if (!fillBuffer()) {
           return null;
         }
@@ -264,15 +256,16 @@ public class RawFile {
       nullFlags.fromByteBuffer(buffer);
       // restore the start of record contents
       buffer.limit(bufferLimit);
-      //buffer.position(recordOffset + headerSize);
       if (buffer.remaining() < (recordSize - headerSize)) {
+
+        //if the buffer reaches the writable size, the buffer increase the record size
+        reSizeBuffer(recordSize);
+
         if (!fillBuffer()) {
           return null;
         }
       }
 
-      recordCount++;
-
       for (int i = 0; i < columnTypes.length; i++) {
         // check if the i'th column is null
         if (nullFlags.get(i)) {
@@ -320,7 +313,7 @@ public class RawFile {
             int len = readRawVarint32();
             byte [] strBytes = new byte[len];
             buffer.get(strBytes);
-            tuple.put(i, DatumFactory.createText(new String(strBytes)));
+            tuple.put(i, DatumFactory.createText(strBytes));
             break;
           }
 
@@ -377,31 +370,45 @@ public class RawFile {
         }
       }
 
-      if(!buffer.hasRemaining() && channel.position() == fileLimit){
-        eof = true;
+      recordCount++;
+
+      if(filePosition - buffer.remaining() >= endOffset){
+        eos = true;
       }
       return new VTuple(tuple);
     }
 
+    private void reSizeBuffer(int writableBytes){
+      if (buffer.capacity() - buffer.remaining()  <  writableBytes) {
+        buf.setIndex(buffer.position(), buffer.limit());
+        buf.markReaderIndex();
+        buf.discardSomeReadBytes();
+        buf.ensureWritable(writableBytes);
+        buffer = buf.nioBuffer(0, buf.capacity());
+        buffer.limit(buf.writerIndex());
+      }
+    }
+
     @Override
     public void reset() throws IOException {
-      // clear the buffer
+      // reset the buffer
       buffer.clear();
-      // reload initial buffer
-      channel.position(fragment.getStartKey());
-      numBytesRead = channel.read(buffer);
-      buffer.flip();
-      eof = false;
+      forceFillBuffer = true;
+      filePosition = fragment.getStartKey();
+      channel.position(filePosition);
+      eos = false;
     }
 
     @Override
     public void close() throws IOException {
-      if (tableStats != null) {
-        tableStats.setReadBytes(fragment.getEndKey());
-        tableStats.setNumRows(recordCount);
+      if(buf != null){
+        buffer.clear();
+        buffer = null;
+
+        buf.release();
+        buf = null;
       }
 
-      StorageUtil.closeBuffer(buffer);
       IOUtils.cleanup(LOG, channel, fis);
     }
 
@@ -421,28 +428,25 @@ public class RawFile {
     }
 
     @Override
-    public float getProgress() {
-      try {
+    public TableStats getInputStats() {
+      if(tableStats != null){
         tableStats.setNumRows(recordCount);
-        long filePos = 0;
-        if (channel != null) {
-          filePos = channel.position();
-          tableStats.setReadBytes(filePos);
-        }
+        tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan + rescan * n)
+        tableStats.setNumBytes(fragment.getEndKey());
+      }
+      return tableStats;
+    }
 
-        if(eof || channel == null) {
-          tableStats.setReadBytes(fragment.getEndKey());
-          return 1.0f;
-        }
+    @Override
+    public float getProgress() {
+      if(eos) {
+        return 1.0f;
+      }
 
-        if (filePos == 0) {
-          return 0.0f;
-        } else {
-          return Math.min(1.0f, ((float)filePos / fragment.getEndKey().floatValue()));
-        }
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
+      if (filePosition - startOffset == 0) {
         return 0.0f;
+      } else {
+        return Math.min(1.0f, ((float) filePosition / endOffset));
       }
     }
   }
@@ -453,6 +457,7 @@ public class RawFile {
     private DataType[] columnTypes;
 
     private ByteBuffer buffer;
+    private ByteBuf buf;
     private BitArray nullFlags;
     private int headerSize = 0;
     private static final int RECORD_SIZE = 4;
@@ -485,7 +490,8 @@ public class RawFile {
         columnTypes[i] = schema.getColumn(i).getDataType();
       }
 
-      buffer = ByteBuffer.allocateDirect(64 * 1024);
+      buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+      buffer = buf.nioBuffer(0, buf.capacity());
 
       // comput the number of bytes, representing the null flags
 
@@ -505,7 +511,6 @@ public class RawFile {
     }
 
     private void flushBuffer() throws IOException {
-      buffer.limit(buffer.position());
       buffer.flip();
       channel.write(buffer);
       buffer.clear();
@@ -743,7 +748,14 @@ public class RawFile {
         LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
       }
 
-      StorageUtil.closeBuffer(buffer);
+      if(buf != null){
+        buffer.clear();
+        buffer = null;
+
+        buf.release();
+        buf = null;
+      }
+
       IOUtils.cleanup(LOG, channel, randomAccessFile);
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/55084a8a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 56cef77..a3f80cf 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -123,7 +123,7 @@ public class TestStorages {
     return Arrays.asList(new Object[][] {
         //type, splitable, statsable, seekable
         {StoreType.CSV, true, true, true},
-        {StoreType.RAW, false, false, true},
+        {StoreType.RAW, false, true, true},
         {StoreType.RCFILE, true, true, false},
         {StoreType.PARQUET, false, false, false},
         {StoreType.SEQUENCEFILE, true, true, false},
@@ -792,7 +792,7 @@ public class TestStorages {
     TableMeta meta = CatalogUtil.newTableMeta(storeType);
     Path tablePath = new Path(testDir, "Seekable.data");
     FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta,
schema,
-	tablePath);
+        tablePath);
     appender.enableStats();
     appender.init();
     int tupleNum = 100000;
@@ -804,12 +804,12 @@ public class TestStorages {
       vTuple = new VTuple(3);
       vTuple.put(0, DatumFactory.createInt4(i + 1));
       vTuple.put(1, DatumFactory.createInt8(25l));
-      vTuple.put(2, DatumFactory.createText("test"));
+      vTuple.put(2, DatumFactory.createText("test" + i));
       appender.addTuple(vTuple);
 
       // find a seek position
       if (i % (tupleNum / 3) == 0) {
-	offsets.add(appender.getOffset());
+        offsets.add(appender.getOffset());
       }
     }
 
@@ -834,17 +834,17 @@ public class TestStorages {
     long readRows = 0;
     for (long offset : offsets) {
       scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema,
-	  new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
+          new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
       scanner.init();
 
       while (scanner.next() != null) {
-	tupleCnt++;
+        tupleCnt++;
       }
 
       scanner.close();
       if (statsable) {
-	readBytes += scanner.getInputStats().getNumBytes();
-	readRows += scanner.getInputStats().getNumRows();
+        readBytes += scanner.getInputStats().getNumBytes();
+        readRows += scanner.getInputStats().getNumRows();
       }
       prevOffset = offset;
     }


Mime
View raw message