tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject git commit: TAJO-717: Improve file splitting for large number of splits. (jinho)
Date Thu, 10 Apr 2014 05:54:33 GMT
Repository: tajo
Updated Branches:
  refs/heads/master d952b61ae -> d99bd085e


TAJO-717: Improve file splitting for large number of splits. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d99bd085
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d99bd085
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d99bd085

Branch: refs/heads/master
Commit: d99bd085e4f02bd11a9133c6bef8942b9dc27723
Parents: d952b61
Author: jinossy <jinossy@gmail.com>
Authored: Thu Apr 10 14:54:01 2014 +0900
Committer: jinossy <jinossy@gmail.com>
Committed: Thu Apr 10 14:54:01 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../tajo/master/DefaultTaskScheduler.java       |  33 ++--
 .../tajo/master/querymaster/Repartitioner.java  |   6 +-
 tajo-storage/pom.xml                            |  28 +++
 .../tajo/storage/AbstractStorageManager.java    | 196 ++++++++++++-------
 .../tajo/storage/fragment/FileFragment.java     |  15 +-
 .../org/apache/tajo/storage/rcfile/RCFile.java  |   9 +-
 .../apache/tajo/storage/TestStorageManager.java | 111 ++++++++++-
 8 files changed, 299 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a83dd25..fe2349f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -143,6 +143,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-717: Improve file splitting for large number of splits. (jinho)
+
     TAJO-356: Improve TajoClient to directly get query results in the first request.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 61fa84e..409a1b1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -44,8 +44,6 @@ import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.NetUtils;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
@@ -582,8 +580,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new
HashSet<QueryUnitAttemptId>());
     private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new
HashSet<QueryUnitAttemptId>());
     private Map<String, HostVolumeMapping> leafTaskHostMapping = new HashMap<String,
HostVolumeMapping>();
-    private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksRackMapping
=
-        new HashMap<String, LinkedList<QueryUnitAttemptId>>();
+    private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping
=
+        new HashMap<String, HashSet<QueryUnitAttemptId>>();
 
     private void addLeafTask(QueryUnitAttemptScheduleEvent event) {
       QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt();
@@ -604,13 +602,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           LOG.debug("Added attempt req to host " + host);
         }
 
-        LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
+        HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
         if (list == null) {
-          list = new LinkedList<QueryUnitAttemptId>();
+          list = new HashSet<QueryUnitAttemptId>();
           leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
         }
 
-        if(!list.contains(queryUnitAttempt.getId())) list.add(queryUnitAttempt.getId());
+        list.add(queryUnitAttempt.getId());
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
@@ -687,14 +685,19 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
       //find task in rack
       if (attemptId == null) {
-        LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
-        while (list != null && list.size() > 0) {
-          QueryUnitAttemptId tId = list.removeFirst();
-
-          if (leafTasks.contains(tId)) {
-            leafTasks.remove(tId);
-            attemptId = tId;
-            break;
+        HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
+        if (list != null) {
+          synchronized (list) {
+            Iterator<QueryUnitAttemptId> iterator = list.iterator();
+            while (iterator.hasNext()) {
+              QueryUnitAttemptId tId = iterator.next();
+              iterator.remove();
+              if (leafTasks.contains(tId)) {
+                leafTasks.remove(tId);
+                attemptId = tId;
+                break;
+              }
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index a72c222..6704230 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -237,10 +237,8 @@ public class Repartitioner {
                                                                           TableDesc table)
throws IOException {
     List<FileFragment> fragments = Lists.newArrayList();
     PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
-    for (Path path : partitionsScan.getInputPaths()) {
-      fragments.addAll(sm.getSplits(
-          scan.getCanonicalName(), table.getMeta(), table.getSchema(), path));
-    }
+    fragments.addAll(sm.getSplits(
+        scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths()));
     partitionsScan.setInputPaths(null);
     return fragments;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index b9a162a..5850ed4 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -231,6 +231,34 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
index a7ed981..6615208 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
@@ -90,6 +90,11 @@ public abstract class AbstractStorageManager {
       throws IOException {
     FileSystem fs = path.getFileSystem(conf);
     FileStatus status = fs.getFileStatus(path);
+    return getFileScanner(meta, schema, path, status);
+  }
+
+  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
+      throws IOException {
     FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
     return getScanner(meta, schema, fragment);
   }
@@ -337,9 +342,8 @@ public abstract class AbstractStorageManager {
    * @return array of FileStatus objects
    * @throws IOException if zero items.
    */
-  protected List<FileStatus> listStatus(Path path) throws IOException {
+  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
     List<FileStatus> result = new ArrayList<FileStatus>();
-    Path[] dirs = new Path[]{path};
     if (dirs.length == 0) {
       throw new IOException("No input paths specified in job");
     }
@@ -392,18 +396,15 @@ public abstract class AbstractStorageManager {
    * so that Mappers process entire files.
    *
    *
-   * @param filename the file name to check
+   * @param path the file name to check
+   * @param status get the file length
    * @return is this file isSplittable?
    */
-  protected boolean isSplittable(TableMeta meta, Schema schema, Path filename) throws IOException
{
-    Scanner scanner = getFileScanner(meta, schema, filename);
-    return scanner.isSplittable();
-  }
-
-
-  protected long computeSplitSize(long blockSize, long minSize,
-                                  long maxSize) {
-    return Math.max(minSize, Math.min(maxSize, blockSize));
+  protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status)
throws IOException {
+    Scanner scanner = getFileScanner(meta, schema, path, status);
+    boolean split = scanner.isSplittable();
+    scanner.close();
+    return split;
   }
 
   private static final double SPLIT_SLOP = 1.1;   // 10% slop
@@ -428,22 +429,22 @@ public abstract class AbstractStorageManager {
    * A factory that makes the split for this class. It can be overridden
    * by sub-classes to make sub-types
    */
-  protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start,
long length) {
+  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length)
{
     return new FileFragment(fragmentId, file, start, length);
   }
 
-  protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start,
long length,
+  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length,
                                    String[] hosts) {
     return new FileFragment(fragmentId, file, start, length, hosts);
   }
 
-  protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation
blockLocation,
-                                   int[] diskIds) throws IOException {
-    return new FileFragment(fragmentId, file, blockLocation, diskIds);
+  protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation)
+      throws IOException {
+    return new FileFragment(fragmentId, file, blockLocation);
   }
 
   // for Non Splittable. eg, compressed gzip TextFile
-  protected FileFragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long
start, long length,
+  protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length,
                                       BlockLocation[] blkLocations) throws IOException {
 
     Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
@@ -535,79 +536,128 @@ public abstract class AbstractStorageManager {
    *
    * @throws IOException
    */
-  public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema,
Path inputPath) throws IOException {
+  public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema,
Path... inputs)
+      throws IOException {
     // generate splits'
 
-    List<FileFragment> splits = new ArrayList<FileFragment>();
-    FileSystem fs = inputPath.getFileSystem(conf);
-    List<FileStatus> files;
-    if (fs.isFile(inputPath)) {
-      files = Lists.newArrayList(fs.getFileStatus(inputPath));
-    } else {
-      files = listStatus(inputPath);
-    }
-    for (FileStatus file : files) {
-      Path path = file.getPath();
-      long length = file.getLen();
-      if (length > 0) {
-        BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-        boolean splittable = isSplittable(meta, schema, path);
-        if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
-          // supported disk volume
-          BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
-              .getFileBlockStorageLocations(Arrays.asList(blkLocations));
-          if (splittable) {
-            for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
-              splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
-                  .getVolumeIds())));
-            }
-          } else { // Non splittable
-            long blockSize = blockStorageLocations[0].getLength();
-            if (blockSize >= length) {
-              for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
-                splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
-                    .getVolumeIds())));
+    List<FileFragment> splits = Lists.newArrayList();
+    List<FileFragment> volumeSplits = Lists.newArrayList();
+    List<BlockLocation> blockLocations = Lists.newArrayList();
+
+    for (Path p : inputs) {
+      FileSystem fs = p.getFileSystem(conf);
+      ArrayList<FileStatus> files = Lists.newArrayList();
+      if (fs.isFile(p)) {
+        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
+      } else {
+        files.addAll(listStatus(p));
+      }
+
+      int previousSplitSize = splits.size();
+      for (FileStatus file : files) {
+        Path path = file.getPath();
+        long length = file.getLen();
+        if (length > 0) {
+          // Get locations of blocks of file
+          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+          boolean splittable = isSplittable(meta, schema, path, file);
+          if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+
+            if (splittable) {
+              for (BlockLocation blockLocation : blkLocations) {
+                volumeSplits.add(makeSplit(tableName, path, blockLocation));
+              }
+              blockLocations.addAll(Arrays.asList(blkLocations));
+
+            } else { // Non splittable
+              long blockSize = blkLocations[0].getLength();
+              if (blockSize >= length) {
+                blockLocations.addAll(Arrays.asList(blkLocations));
+                for (BlockLocation blockLocation : blkLocations) {
+                  volumeSplits.add(makeSplit(tableName, path, blockLocation));
+                }
+              } else {
+                splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
               }
-            } else {
-              splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
             }
-          }
 
-        } else {
-          if (splittable) {
+          } else {
+            if (splittable) {
 
-            long minSize = Math.max(getMinSplitSize(), 1);
+              long minSize = Math.max(getMinSplitSize(), 1);
 
-            long blockSize = file.getBlockSize(); // s3n rest api contained block size but
blockLocations is one
-            long splitSize = Math.max(minSize, blockSize);
-            long bytesRemaining = length;
+              long blockSize = file.getBlockSize(); // s3n rest api contained block size
but blockLocations is one
+              long splitSize = Math.max(minSize, blockSize);
+              long bytesRemaining = length;
 
-            // for s3
-            while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
-              int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-              splits.add(makeSplit(tableName, meta, path, length - bytesRemaining, splitSize,
-                  blkLocations[blkIndex].getHosts()));
-              bytesRemaining -= splitSize;
-            }
-            if (bytesRemaining > 0) {
-              int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-              splits.add(makeSplit(tableName, meta, path, length - bytesRemaining, bytesRemaining,
-                  blkLocations[blkIndex].getHosts()));
+              // for s3
+              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize,
+                    blkLocations[blkIndex].getHosts()));
+                bytesRemaining -= splitSize;
+              }
+              if (bytesRemaining > 0) {
+                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining,
+                    blkLocations[blkIndex].getHosts()));
+              }
+            } else { // Non splittable
+              splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
             }
-          } else { // Non splittable
-            splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
           }
+        } else {
+          //for zero length files
+          splits.add(makeSplit(tableName, path, 0, length));
         }
-      } else {
-        //for zero length files
-        splits.add(makeSplit(tableName, meta, path, 0, length));
+      }
+      if(LOG.isDebugEnabled()){
+        LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize));
       }
     }
 
+    // Combine original fileFragments with new VolumeId information
+    setVolumeMeta(volumeSplits, blockLocations);
+    splits.addAll(volumeSplits);
     LOG.info("Total # of splits: " + splits.size());
     return splits;
   }
 
+  private void setVolumeMeta(List<FileFragment> splits, final List<BlockLocation>
blockLocations)
+      throws IOException {
+
+    int locationSize = blockLocations.size();
+    int splitSize = splits.size();
+    if (locationSize == 0 || splitSize == 0) return;
+
+    if (locationSize != splitSize) {
+      // splits and locations don't match up
+      LOG.warn("Number of block locations not equal to number of splits: "
+          + "#locations=" + locationSize
+          + " #splits=" + splitSize);
+      return;
+    }
+
+    DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf);
+    int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
+    int blockLocationIdx = 0;
+
+    Iterator<FileFragment> iter = splits.iterator();
+    while (locationSize > blockLocationIdx) {
+
+      int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
+      List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx
+ subSize);
+      //BlockStorageLocation containing additional volume location information for each replica
of each block.
+      BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations);
+
+      for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+        iter.next().setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
+        blockLocationIdx++;
+      }
+    }
+    LOG.info("# of splits with volumeId " + splitSize);
+  }
+
   private static class InvalidInputException extends IOException {
     List<IOException> errors;
     public InvalidInputException(List<IOException> errors) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
index ea8bf9f..6fe6841 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
@@ -50,16 +50,17 @@ public class FileFragment implements Fragment, Comparable<FileFragment>,
Cloneab
     init(builder.build());
   }
 
-  public FileFragment(String tableName, Path uri, BlockLocation blockLocation, int[] diskIds)
+  public FileFragment(String tableName, Path uri, BlockLocation blockLocation)
       throws IOException {
-    this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(),
-        blockLocation.getHosts(), diskIds);
+    this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(),
null);
   }
 
+  public FileFragment(String tableName, Path uri, long start, long length, String[] hosts,
int[] diskIds) {
+    this.set(tableName, uri, start, length, hosts, diskIds);
+  }
   // Non splittable
   public FileFragment(String tableName, Path uri, long start, long length, String[] hosts)
{
-    this.set(tableName, uri, start, length, null, null);
-    this.hosts = hosts;
+    this.set(tableName, uri, start, length, hosts, null);
   }
 
   public FileFragment(String fragmentId, Path path, long start, long length) {
@@ -115,6 +116,10 @@ public class FileFragment implements Fragment, Comparable<FileFragment>,
Cloneab
     return diskIds;
   }
 
+  public void setDiskIds(int[] diskIds){
+    this.diskIds = diskIds;
+  }
+
   public String getTableName() {
     return this.tableName;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index bbb9df1..1beea99 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -1134,14 +1134,14 @@ public class RCFile {
     private CompressionCodec codec = null;
     private Metadata metadata = null;
 
-    private final byte[] sync = new byte[SYNC_HASH_SIZE];
-    private final byte[] syncCheck = new byte[SYNC_HASH_SIZE];
+    private byte[] sync;
+    private byte[] syncCheck;
     private boolean syncSeen;
     private long lastSeenSyncPos = 0;
 
     private long headerEnd;
     private long start, end;
-    private long startOffset, endOffset;
+    private final long startOffset, endOffset;
     private int[] targetColumnIndexes;
 
     private int currentKeyLength;
@@ -1188,6 +1188,9 @@ public class RCFile {
 
     @Override
     public void init() throws IOException {
+      sync = new byte[SYNC_HASH_SIZE];
+      syncCheck = new byte[SYNC_HASH_SIZE];
+
       more = startOffset < endOffset;
       rowId = new LongWritable();
       readBytes = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/d99bd085/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
index 083670a..be8b6de 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
@@ -18,8 +18,11 @@
 
 package org.apache.tajo.storage;
 
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
@@ -28,14 +31,18 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 public class TestStorageManager {
 	private TajoConf conf;
@@ -43,6 +50,7 @@ public class TestStorageManager {
 	AbstractStorageManager sm = null;
   private Path testDir;
   private FileSystem fs;
+
 	@Before
 	public void setUp() throws Exception {
 		conf = new TajoConf();
@@ -90,4 +98,105 @@ public class TestStorageManager {
 		}
 		assertEquals(4,i);
 	}
+
+  @Test
+  public void testGetSplit() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+
+    int testCount = 100;
+    Path tablePath = new Path("/testGetSplit");
+    try {
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      // Create test partitions
+      List<Path> partitions = Lists.newArrayList();
+      for (int i =0; i < testCount; i++){
+        Path tmpFile = new Path(tablePath, String.valueOf(i));
+        DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl);
+        partitions.add(tmpFile);
+      }
+
+      assertTrue(fs.exists(tablePath));
+      AbstractStorageManager sm = StorageManagerFactory.getStorageManager(new TajoConf(conf),
tablePath);
+
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("age",Type.INT4);
+      schema.addColumn("name",Type.TEXT);
+      TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+      List<FileFragment> splits = Lists.newArrayList();
+      // Get FileFragments in partition batch
+      splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
+      assertEquals(testCount, splits.size());
+      // -1 is unknown volumeId
+      assertEquals(-1, splits.get(0).getDiskIds()[0]);
+
+      splits.clear();
+      splits.addAll(sm.getSplits("data", meta, schema,
+          partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size()
/ 2])));
+      assertEquals(testCount / 2, splits.size());
+      assertEquals(1, splits.get(0).getHosts().length);
+      assertEquals(-1, splits.get(0).getDiskIds()[0]);
+      fs.close();
+    } finally {
+      cluster.shutdown();
+
+      File dir = new File(testDataPath);
+      dir.delete();
+    }
+  }
+
+  @Test
+  public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2).build();
+
+    int testCount = 100;
+    Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
+    try {
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      // Create test files
+      for (int i = 0; i < testCount; i++) {
+        Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat");
+        DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
+      }
+      assertTrue(fs.exists(tablePath));
+      AbstractStorageManager sm = StorageManagerFactory.getStorageManager(new TajoConf(conf),
tablePath);
+
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("age", Type.INT4);
+      schema.addColumn("name", Type.TEXT);
+      TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+      List<FileFragment> splits = Lists.newArrayList();
+      splits.addAll(sm.getSplits("data", meta, schema, tablePath));
+
+      assertEquals(testCount, splits.size());
+      assertEquals(2, splits.get(0).getHosts().length);
+      assertEquals(2, splits.get(0).getDiskIds().length);
+      assertNotEquals(-1, splits.get(0).getDiskIds()[0]);
+      fs.close();
+    } finally {
+      cluster.shutdown();
+
+      File dir = new File(testDataPath);
+      dir.delete();
+    }
+  }
 }


Mime
View raw message