tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [3/3] git commit: TAJO-223: Maximize disk read bandwidth utilization of StorageManagerV2 by moving Tuple creation role to next() (Keuntae Park via hyunsik)
Date Fri, 04 Oct 2013 09:53:08 GMT
TAJO-223: Maximize disk read bandwidth utilization of StorageManagerV2 by moving Tuple creation
role to next() (Keuntae Park via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/4449d9c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/4449d9c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/4449d9c4

Branch: refs/heads/master
Commit: 4449d9c48e9d0cdca7fa4a523fe4f9f389404f87
Parents: 3f61b97
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Fri Oct 4 18:39:26 2013 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Fri Oct 4 18:39:26 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |    3 +
 .../java/org/apache/tajo/client/TajoClient.java |    1 +
 .../apache/tajo/storage/v2/CSVFileScanner.java  |  241 +--
 .../tajo/storage/v2/DiskFileScanScheduler.java  |   69 +-
 .../apache/tajo/storage/v2/FileScannerV2.java   |  137 +-
 .../java/org/apache/tajo/storage/v2/RCFile.java | 1812 ++++++++++++++++++
 .../apache/tajo/storage/v2/RCFileScanner.java   |  104 +-
 .../apache/tajo/storage/v2/ScanScheduler.java   |   30 +
 .../tajo/storage/v2/ScheduledInputStream.java   |  513 +++++
 .../tajo/storage/v2/StorageManagerV2.java       |    7 +-
 .../apache/tajo/storage/index/TestBSTIndex.java |    9 +-
 .../tajo/storage/v2/TestCSVCompression.java     |  210 ++
 .../apache/tajo/storage/v2/TestCSVScanner.java  |  166 ++
 .../apache/tajo/storage/v2/TestStorages.java    |  240 +++
 14 files changed, 3288 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4449d9c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f879d0c..f57ac21 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,9 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-223: Maximize disk read bandwidth utilization of StorageManagerV2 by
+    moving Tuple creation role to next(). (Keuntae Park via hyunsik)
+
     TAJO-199: All relations in catalog must have data volume size. (hyunsik)
 
     TAJO-224: Rearrange DataType enumeration and Refactor type systems.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4449d9c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
index 068ece5..e030b86 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -65,6 +65,7 @@ public class TajoClient {
 
   public TajoClient(TajoConf conf) throws IOException {
     this.conf = conf;
+    this.conf.set("tajo.disk.scheduler.report.interval", "0");
     String masterAddr = this.conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS);
     InetSocketAddress addr = NetUtils.createSocketAddr(masterAddr);
     connect(addr);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4449d9c4/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
index 7b40a46..4529873 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
@@ -18,32 +18,24 @@
 
 package org.apache.tajo.storage.v2;
 
-import com.google.protobuf.Message;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.net.util.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.ProtobufDatumFactory;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
-import org.apache.tajo.datum.protobuf.TextUtils;
 import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.LazyTuple;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.util.Bytes;
 
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
 public class CSVFileScanner extends FileScannerV2 {
   public static final String DELIMITER = "csvfile.delimiter";
   public static final String DELIMITER_DEFAULT = "|";
@@ -53,8 +45,8 @@ public class CSVFileScanner extends FileScannerV2 {
   private final static int DEFAULT_BUFFER_SIZE = 256 * 1024;
   private int bufSize;
   private char delimiter;
-  private FSDataInputStream fis;
-  private InputStream is; //decompressd stream
+  private ScheduledInputStream sin;
+  private InputStream is; // decompressd stream
   private CompressionCodecFactory factory;
   private CompressionCodec codec;
   private Decompressor decompressor;
@@ -62,7 +54,7 @@ public class CSVFileScanner extends FileScannerV2 {
   private boolean splittable = true;
   private long startOffset, length;
   private byte[] buf = null;
-  private String[] tuples = null;
+  private byte[][] tuples = null;
   private long[] tupleOffsets = null;
   private int currentIdx = 0, validIdx = 0;
   private byte[] tail = null;
@@ -70,6 +62,10 @@ public class CSVFileScanner extends FileScannerV2 {
   private long prevTailLen = -1;
   private int[] targetColumnIndexes;
   private boolean eof = false;
+  private boolean first = true;
+
+  private long totalReadBytesForFetch;
+  private long totalReadBytesFromDisk;
 
   public CSVFileScanner(Configuration conf, final TableMeta meta,
                     final Fragment fragment) throws IOException {
@@ -92,26 +88,31 @@ public class CSVFileScanner extends FileScannerV2 {
   }
 
   @Override
-  protected void initFirstScan() throws IOException {
-    if(!firstSchdeuled) {
-      return;
-    }
-    firstSchdeuled = false;
-
-    // Fragment information
-    fis = fs.open(fragment.getPath(), 128 * 1024);
-    startOffset = fragment.getStartOffset();
-    length = fragment.getLength();
-
-    if (startOffset > 0) {
-      startOffset--; // prev line feed
+  protected boolean initFirstScan(int maxBytesPerSchedule) throws IOException {
+    synchronized(this) {
+      eof = false;
+      first = true;
+      if(sin == null) {
+        FSDataInputStream fin = fs.open(fragment.getPath(), 128 * 1024);
+        sin = new ScheduledInputStream(fragment.getPath(), fin,
+            fragment.getStartOffset(), fragment.getLength(), fs.getLength(fragment.getPath()));
+        startOffset = fragment.getStartOffset();
+        length = fragment.getLength();
+
+        if (startOffset > 0) {
+          startOffset--; // prev line feed
+        }
+      }
     }
+    return true;
+  }
 
+  private boolean scanFirst() throws IOException {
     if (codec != null) {
       decompressor = CodecPool.getDecompressor(codec);
       if (codec instanceof SplittableCompressionCodec) {
         SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
-            fis, decompressor, startOffset, startOffset + length,
+            sin, decompressor, startOffset, startOffset + length,
             SplittableCompressionCodec.READ_MODE.BYBLOCK);
 
         startOffset = cIn.getAdjustedStart();
@@ -119,15 +120,15 @@ public class CSVFileScanner extends FileScannerV2 {
         filePosition = cIn;
         is = cIn;
       } else {
-        is = new DataInputStream(codec.createInputStream(fis, decompressor));
+        is = new DataInputStream(codec.createInputStream(sin, decompressor));
       }
     } else {
-      fis.seek(startOffset);
-      filePosition = fis;
-      is = fis;
+      sin.seek(startOffset);
+      filePosition = sin;
+      is = sin;
     }
 
-    tuples = new String[0];
+    tuples = new byte[0][];
     if (targets == null) {
       targets = schema.toArray();
     }
@@ -151,9 +152,18 @@ public class CSVFileScanner extends FileScannerV2 {
 
     if (fragmentable() < 1) {
       close();
-      return;
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean isStopScanScheduling() {
+    if(sin != null && sin.IsEndOfStream()) {
+      return true;
+    } else {
+      return false;
     }
-    page();
   }
 
   private long fragmentable() throws IOException {
@@ -166,11 +176,21 @@ public class CSVFileScanner extends FileScannerV2 {
     if (filePosition != null) {
       retVal = filePosition.getPos();
     } else {
-      retVal = fis.getPos();
+      retVal = sin.getPos();
     }
     return retVal;
   }
 
+  @Override
+  public boolean isFetchProcessing() {
+    if(sin != null &&
+        (sin.getAvaliableSize() >= 64 * 1024 * 1024)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   private void page() throws IOException {
     // Index initialization
     currentIdx = 0;
@@ -200,34 +220,37 @@ public class CSVFileScanner extends FileScannerV2 {
 
     if (prevTailLen == 0) {
       tail = new byte[0];
-      tuples = StringUtils.split(new String(buf, 0, rbyte), (char) LF);
+      tuples = Bytes.splitPreserveAllTokens(buf, rbyte, (char) LF);
     } else {
-      tuples = StringUtils.split(new String(tail)
-          + new String(buf, 0, rbyte), (char) LF);
+      byte[] lastRow = ArrayUtils.addAll(tail, buf);
+      tuples = Bytes.splitPreserveAllTokens(lastRow, rbyte + tail.length, (char) LF);
       tail = null;
     }
 
     // Check tail
     if ((char) buf[rbyte - 1] != LF) {
       if ((fragmentable() < 1 || rbyte != bufSize)) {
-        int cnt = 0;
+        int lineFeedPos = 0;
         byte[] temp = new byte[DEFAULT_BUFFER_SIZE];
-        // Read bytes
-        while ((temp[cnt] = (byte) is.read()) != LF) {
-          cnt++;
+
+        // find line feed
+        while ((temp[lineFeedPos] = (byte)is.read()) != (byte)LF) {
+          if(temp[lineFeedPos] < 0) {
+            break;
+          }
+          lineFeedPos++;
         }
 
-        // Replace tuple
-        tuples[tuples.length - 1] = tuples[tuples.length - 1]
-            + new String(temp, 0, cnt);
+        tuples[tuples.length - 1] = ArrayUtils.addAll(tuples[tuples.length - 1],
+            ArrayUtils.subarray(temp, 0, lineFeedPos));
         validIdx = tuples.length;
       } else {
-        tail = tuples[tuples.length - 1].getBytes();
+        tail = tuples[tuples.length - 1];
         validIdx = tuples.length - 1;
       }
     } else {
       tail = new byte[0];
-      validIdx = tuples.length;
+      validIdx = tuples.length - 1;
     }
 
     if(!isCompress()) makeTupleOffset();
@@ -238,13 +261,18 @@ public class CSVFileScanner extends FileScannerV2 {
     this.tupleOffsets = new long[this.validIdx];
     for (int i = 0; i < this.validIdx; i++) {
       this.tupleOffsets[i] = curTupleOffset + this.pageStart;
-      curTupleOffset += this.tuples[i].getBytes().length + 1;// tuple byte
-      // + 1byte
-      // line feed
+      curTupleOffset += this.tuples[i].length + 1;//tuple byte +  1byte line feed
     }
   }
 
-  protected Tuple getNextTuple() throws IOException {
+  protected Tuple nextTuple() throws IOException {
+    if(first) {
+      boolean more = scanFirst();
+      first = false;
+      if(!more) {
+        return null;
+      }
+    }
     try {
       if (currentIdx == validIdx) {
         if (isSplittable() && fragmentable() < 1) {
@@ -265,71 +293,8 @@ public class CSVFileScanner extends FileScannerV2 {
         offset = this.tupleOffsets[currentIdx];
       }
 
-      String[] cells = StringUtils.splitPreserveAllTokens(tuples[currentIdx++], delimiter);
-
-      int targetLen = targets.length;
-
-      VTuple tuple = new VTuple(columnNum);
-      Column field;
-      tuple.setOffset(offset);
-      for (int i = 0; i < targetLen; i++) {
-        field = targets[i];
-        int tid = targetColumnIndexes[i];
-        if (cells.length <= tid) {
-          tuple.put(tid, DatumFactory.createNullDatum());
-        } else {
-          String cell = cells[tid].trim();
-
-          if (cell.equals("")) {
-            tuple.put(tid, DatumFactory.createNullDatum());
-          } else {
-            DataType dataType = field.getDataType();
-            switch (dataType.getType()) {
-              case BOOLEAN:
-                tuple.put(tid, DatumFactory.createBool(cell));
-                break;
-              case BIT:
-                tuple.put(tid, DatumFactory.createBit(Base64.decodeBase64(cell)[0]));
-                break;
-              case CHAR:
-                String trimmed = cell.trim();
-                tuple.put(tid, DatumFactory.createChar(trimmed));
-                break;
-              case BLOB:
-                tuple.put(tid, DatumFactory.createBlob(Base64.decodeBase64(cell)));
-                break;
-              case INT2:
-                tuple.put(tid, DatumFactory.createInt2(cell));
-                break;
-              case INT4:
-                tuple.put(tid, DatumFactory.createInt4(cell));
-                break;
-              case INT8:
-                tuple.put(tid, DatumFactory.createInt8(cell));
-                break;
-              case FLOAT4:
-                tuple.put(tid, DatumFactory.createFloat4(cell));
-                break;
-              case FLOAT8:
-                tuple.put(tid, DatumFactory.createFloat8(cell));
-                break;
-              case TEXT:
-                tuple.put(tid, DatumFactory.createText(cell));
-                break;
-              case PROTOBUF:
-                ProtobufDatumFactory factory = ProtobufDatumFactory.get(dataType);
-                Message.Builder builder = factory.newBuilder();
-                ProtobufJsonFormat.getInstance().merge(TextUtils.toInputStream(cell), builder);
-                tuple.put(tid, factory.createDatum(builder.build()));
-                break;
-              case INET4:
-                tuple.put(tid, DatumFactory.createInet4(cell));
-                break;
-            }
-          }
-        }
-      }
-      return tuple;
+      byte[][] cells = Bytes.splitPreserveAllTokens(tuples[currentIdx++], delimiter, targetColumnIndexes);
+      return new LazyTuple(schema, cells, offset);
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
     }
@@ -341,8 +306,22 @@ public class CSVFileScanner extends FileScannerV2 {
   }
 
   @Override
-  public void reset() throws IOException {
-    super.reset();
+  public void scannerReset() {
+    if(sin != null) {
+      try {
+        filePosition.seek(0);
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+    if(sin != null) {
+      try {
+        sin.seek(0);
+        sin.reset();
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
   }
 
   @Override
@@ -350,8 +329,16 @@ public class CSVFileScanner extends FileScannerV2 {
     if(closed.get()) {
       return;
     }
+    if(sin != null) {
+      totalReadBytesForFetch = sin.getTotalReadBytesForFetch();
+      totalReadBytesFromDisk = sin.getTotalReadBytesFromDisk();
+    }
     try {
-      is.close();
+      if(is != null) {
+        is.close();
+      }
+      is = null;
+      sin = null;
     } finally {
       if (decompressor != null) {
         CodecPool.returnDecompressor(decompressor);
@@ -363,6 +350,16 @@ public class CSVFileScanner extends FileScannerV2 {
   }
 
   @Override
+  protected boolean scanNext(int length) throws IOException {
+    synchronized(this) {
+      if(isClosed()) {
+        return false;
+      }
+      return sin.readNext(length);
+    }
+  }
+
+  @Override
   public boolean isProjectable() {
     return true;
   }
@@ -381,4 +378,8 @@ public class CSVFileScanner extends FileScannerV2 {
     return splittable;
   }
 
+  @Override
+  protected long[] reportReadBytes() {
+    return new long[]{totalReadBytesForFetch, totalReadBytesFromDisk};
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4449d9c4/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
index d55a6db..2f97d41 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
@@ -27,13 +27,14 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class DiskFileScanScheduler extends Thread {
   private static final Log LOG = LogFactory.getLog(DiskFileScanScheduler.class);
 
 	private Queue<FileScannerV2> requestQueue = new LinkedList<FileScannerV2>();
 
-  Queue<FileScannerV2> fetchingScanners = new LinkedList<FileScannerV2>();
+  List<FileScannerV2> fetchingScanners = new ArrayList<FileScannerV2>();
 
   private int scanConcurrency;
 
@@ -51,6 +52,14 @@ public class DiskFileScanScheduler extends Thread {
 
   private FetchWaitingThread fetchWaitingThread;
 
+  private AtomicLong totalReadBytesForFetch = new AtomicLong(0);
+
+  private AtomicLong totalReadBytesFromDisk = new AtomicLong(0);
+
+  private long[] lastReportReadBytes;
+
+  private long lastReportTime = 0;
+
 	public DiskFileScanScheduler(
 			StorageManagerV2.StorgaeManagerContext smContext,
 			DiskDeviceInfo diskDeviceInfo) {
@@ -62,6 +71,11 @@ public class DiskFileScanScheduler extends Thread {
 		this.fetchWaitingThread.start();
 	}
 
+  public void incrementReadBytes(long[] readBytes) {
+    totalReadBytesForFetch.addAndGet(readBytes[0]);
+    totalReadBytesFromDisk.addAndGet(readBytes[1]);
+  }
+
   public int getDiskId() {
     return diskDeviceInfo.getId();
   }
@@ -86,10 +100,14 @@ public class DiskFileScanScheduler extends Thread {
               break;
             }
           }
+          if(fileScanner.isStopScanScheduling()) {
+            LOG.info("Exit from Disk Queue:" + fileScanner.getId());
+            continue;
+          }
           if(fileScanner.isFetchProcessing()) {
-            fetchingScanners.add(fileScanner);
             synchronized(fetchingScanners) {
-              fetchingScanners.notifyAll();
+              fetchingScanners.add(fileScanner);
+              //fetchingScanners.notifyAll();
             }
           } else {
             numOfRunningScanners.incrementAndGet();
@@ -113,27 +131,23 @@ public class DiskFileScanScheduler extends Thread {
 	}
 
   public class FetchWaitingThread extends Thread {
+    List<FileScannerV2> workList = new ArrayList<FileScannerV2>(20);
     public void run() {
       while(!stopped.get()) {
-        FileScannerV2 scanner = null;
-        synchronized(fetchingScanners) {
-          scanner = fetchingScanners.poll();
-          if(scanner == null) {
-            try {
-              fetchingScanners.wait();
-              continue;
-            } catch (InterruptedException e) {
-              break;
-            }
-          }
-        }
         try {
-          Thread.sleep(10);
+          Thread.sleep(100);
         } catch (InterruptedException e) {
           break;
         }
+        workList.clear();
+        synchronized(fetchingScanners) {
+          workList.addAll(fetchingScanners);
+          fetchingScanners.clear();
+        }
         synchronized(requestQueueMonitor) {
-          requestQueue.offer(scanner);
+          for(FileScannerV2 eachScanner: workList) {
+            requestQueue.offer(eachScanner);
+          }
           requestQueueMonitor.notifyAll();
         }
       }
@@ -165,4 +179,25 @@ public class DiskFileScanScheduler extends Thread {
 
 		this.interrupt();
 	}
+
+  public void printDiskSchedulerInfo() {
+    long currentReadBytes[] = new long[]{totalReadBytesForFetch.get(), totalReadBytesFromDisk.get()};
+    int[] throughput = new int[2];
+    if(lastReportTime != 0 && lastReportReadBytes != null) {
+      int sec = (int)((System.currentTimeMillis() - lastReportTime)/1000);
+      throughput[0] = (int)((currentReadBytes[0] - lastReportReadBytes[0])/sec);
+      throughput[1] = (int)((currentReadBytes[1] - lastReportReadBytes[1])/sec);
+    }
+    lastReportTime = System.currentTimeMillis();
+
+    LOG.info("===>" + DiskFileScanScheduler.this.diskDeviceInfo
+        + ", request=" + requestQueue.size()
+        + ", fetching=" + fetchingScanners.size()
+        + ", running=" + numOfRunningScanners.get()
+        + ", totalScan=" + totalScanCount
+        + ", FetchThroughput=" + throughput[0]/1024 + "KB"
+        + ", DiskScanThroughput=" + throughput[1]/1024 + "KB");
+
+    lastReportReadBytes = currentReadBytes;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4449d9c4/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
index a431f5d..fa757d8 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
@@ -31,16 +31,11 @@ import org.apache.tajo.storage.Scanner;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class FileScannerV2 implements Scanner {
   private static final Log LOG = LogFactory.getLog(FileScannerV2.class);
 
-  protected AtomicBoolean fetchProcessing = new AtomicBoolean(false);
-
 	protected AtomicBoolean closed = new AtomicBoolean(false);
 
 	protected FileSystem fs;
@@ -52,20 +47,28 @@ public abstract class FileScannerV2 implements Scanner {
   protected final Fragment fragment;
   protected final int columnNum;
   protected Column[] targets;
+  protected long totalScanTime = 0;
+  protected int allocatedDiskId;
 
   protected StorageManagerV2.StorgaeManagerContext smContext;
 
-  protected boolean firstSchdeuled = true;
+  protected AtomicBoolean firstSchdeuled = new AtomicBoolean(true);
+
+  protected abstract boolean scanNext(int length) throws IOException;
 
-  protected Queue<Tuple> tuplePool;
+  protected abstract boolean initFirstScan(int maxBytesPerSchedule) throws IOException;
 
-  AtomicInteger tuplePoolMemory = new AtomicInteger();
+  protected abstract long getFilePosition() throws IOException;
 
-  protected abstract Tuple getNextTuple() throws IOException;
+  protected abstract Tuple nextTuple() throws IOException;
 
-  protected abstract void initFirstScan() throws IOException;
+  public abstract boolean isFetchProcessing();
 
-  protected abstract long getFilePosition() throws IOException;
+  public abstract boolean isStopScanScheduling();
+
+  public abstract void scannerReset();
+
+  protected abstract long[] reportReadBytes();
 
 	public FileScannerV2(final Configuration conf,
                        final TableMeta meta,
@@ -77,19 +80,11 @@ public abstract class FileScannerV2 implements Scanner {
     this.columnNum = this.schema.getColumnNum();
 
     this.fs = fragment.getPath().getFileSystem(conf);
-
-    tuplePool = new ConcurrentLinkedQueue<Tuple>();
 	}
 
   public void init() throws IOException {
     closed.set(false);
-    fetchProcessing.set(false);
-    firstSchdeuled = true;
-    //tuplePoolIndex = 0;
-    if(tuplePool == null) {
-      tuplePool = new ConcurrentLinkedQueue<Tuple>();
-    }
-    tuplePool.clear();
+    firstSchdeuled.set(true);
 
     if(!inited) {
       smContext.requestFileScan(this);
@@ -99,14 +94,18 @@ public abstract class FileScannerV2 implements Scanner {
 
   @Override
   public void reset() throws IOException {
+    scannerReset();
     close();
     inited = false;
-
     init();
   }
 
+  public void setAllocatedDiskId(int allocatedDiskId) {
+    this.allocatedDiskId = allocatedDiskId;
+  }
+
   public String getId() {
-    return fragment.getPath().toString() + ":" + fragment.getStartOffset() + ":" +
+    return fragment.getPath().getName() + ":" + fragment.getStartOffset() + ":" +
         fragment.getLength() + "_" + System.currentTimeMillis();
   }
 
@@ -146,59 +145,31 @@ public abstract class FileScannerV2 implements Scanner {
     this.smContext = context;
   }
 
-  public boolean isFetchProcessing() {
-//    return fetchProcessing.get();
-    return tuplePoolMemory.get() > 16 * 1024 * 1024;
-  }
-
-  long lastScanScheduleTime;
-
   public String toString() {
     return fragment.getPath() + ":" + fragment.getStartOffset();
   }
 
   public void scan(int maxBytesPerSchedule) throws IOException {
-    if(firstSchdeuled) {
-      initFirstScan();
-      firstSchdeuled = false;
-    }
-    long scanStartPos = getFilePosition();
-    int recordCount = 0;
-    while(true) {
-      Tuple tuple = getNextTuple();
-      if(tuple == null) {
-        break;
-      }
-      tuplePoolMemory.addAndGet(tuple.size());
-      tuplePool.offer(tuple);
-      recordCount++;
-      if(recordCount % 1000 == 0) {
-        if(getFilePosition() - scanStartPos >= maxBytesPerSchedule) {
-          break;
-        } else {
-          synchronized(tuplePool) {
-            tuplePool.notifyAll();
-          }
+    long startTime = System.currentTimeMillis();
+    try {
+    synchronized(firstSchdeuled) {
+      if(firstSchdeuled.get()) {
+        boolean moreData = initFirstScan(maxBytesPerSchedule);
+        firstSchdeuled.set(false);
+        firstSchdeuled.notifyAll();
+        if(moreData) {
+          smContext.requestFileScan(this);
         }
+        return;
       }
     }
-    if(tuplePool != null) {
-      synchronized(tuplePool) {
-        tuplePool.notifyAll();
-      }
-    }
-    if(!isClosed()) {
+    boolean moreData = scanNext(maxBytesPerSchedule);
+
+    if(moreData) {
       smContext.requestFileScan(this);
     }
-  }
-
-  public void waitScanStart() {
-    //for test
-    synchronized(fetchProcessing) {
-      try {
-        fetchProcessing.wait();
-      } catch (InterruptedException e) {
-      }
+    } finally {
+      totalScanTime += System.currentTimeMillis() - startTime;
     }
   }
 
@@ -207,12 +178,10 @@ public abstract class FileScannerV2 implements Scanner {
     if(closed.get()) {
       return;
     }
+    long[] readBytes = reportReadBytes();
+    smContext.incrementReadBytes(allocatedDiskId, readBytes);
     closed.set(true);
-
-    synchronized(tuplePool) {
-      tuplePool.notifyAll();
-    }
-    LOG.info(toString() + " closed");
+    LOG.info(toString() + " closed, totalScanTime=" + totalScanTime);
   }
 
   public boolean isClosed() {
@@ -220,30 +189,14 @@ public abstract class FileScannerV2 implements Scanner {
   }
 
   public Tuple next() throws IOException {
-    if(isClosed() && tuplePool == null) {
-      return null;
-    }
-    while(true) {
-      Tuple tuple = tuplePool.poll();
-      if(tuple == null) {
-        if(isClosed()) {
-          tuplePool.clear();
-          tuplePool = null;
-          return null;
-        }
-        synchronized(tuplePool) {
-          try {
-            tuplePool.wait();
-          } catch (InterruptedException e) {
-            break;
-          }
+    synchronized(firstSchdeuled) {
+      if(firstSchdeuled.get()) {
+        try {
+          firstSchdeuled.wait();
+        } catch (InterruptedException e) {
         }
-      } else {
-        tuplePoolMemory.addAndGet(0 - tuple.size());
-        return tuple;
       }
     }
-
-    return null;
+    return nextTuple();
   }
 }


Mime
View raw message