incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/3] git commit: Trying to reduce NameNode load.
Date Tue, 28 Apr 2015 13:05:47 GMT
Trying to reduce NameNode load.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/0092462c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/0092462c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/0092462c

Branch: refs/heads/master
Commit: 0092462c657e17dc6f7f414fda147a32147b27aa
Parents: 34c7edb
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Apr 28 09:02:05 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Apr 28 09:02:05 2015 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/BlurInputFormat.java     | 36 +++++++++++++++-----
 .../blur/mapreduce/lib/GenericRecordReader.java |  6 ++--
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 10 ++++--
 3 files changed, 40 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0092462c/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
index c89196e..a0cc7de 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
@@ -20,9 +20,9 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -175,7 +175,7 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
       Text snapshot) throws IOException {
     final long start = System.nanoTime();
     List<BlurInputSplit> splits = new ArrayList<BlurInputSplit>();
-    Directory directory = getDirectory(configuration, table.toString(), shardDir);
+    Directory directory = getDirectory(configuration, table.toString(), shardDir, null);
     try {
       SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(configuration,
           SnapshotIndexDeletionPolicy.getGenerationsPath(shardDir));
@@ -197,12 +197,14 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
           LOG.info("Segment [{0}] in dir [{1}] has all records deleted.", segmentInfo.name,
shardDir);
         } else {
           String name = segmentInfo.name;
-          Set<String> files = segmentInfo.files();
+          Collection<String> files = commit.files();
           long fileLength = 0;
           for (String file : files) {
             fileLength += directory.fileLength(file);
           }
-          splits.add(new BlurInputSplit(shardDir, segmentsFileName, name, fileLength, table));
+          List<String> dirFiles = new ArrayList<String>(files);
+          dirFiles.add(segmentsFileName);
+          splits.add(new BlurInputSplit(shardDir, segmentsFileName, name, fileLength, table,
dirFiles));
         }
       }
       return splits;
@@ -271,17 +273,24 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
     private Path _dir;
     private String _segmentInfoName;
     private Text _table = new Text();
+    private List<String> _directoryFiles;
 
     public BlurInputSplit() {
 
     }
 
-    public BlurInputSplit(Path dir, String segmentsName, String segmentInfoName, long fileLength,
Text table) {
+    public BlurInputSplit(Path dir, String segmentsName, String segmentInfoName, long fileLength,
Text table,
+        List<String> directoryFiles) {
       _fileLength = fileLength;
       _segmentsName = segmentsName;
       _segmentInfoName = segmentInfoName;
       _table = table;
       _dir = dir;
+      _directoryFiles = directoryFiles;
+    }
+
+    public List<String> getDirectoryFiles() {
+      return _directoryFiles;
     }
 
     @Override
@@ -318,6 +327,11 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
       writeString(out, _segmentInfoName);
       _table.write(out);
       out.writeLong(_fileLength);
+      int size = _directoryFiles.size();
+      out.writeInt(size);
+      for (String s : _directoryFiles) {
+        writeString(out, s);
+      }
     }
 
     @Override
@@ -327,6 +341,11 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
       _segmentInfoName = readString(in);
       _table.readFields(in);
       _fileLength = in.readLong();
+      int size = in.readInt();
+      _directoryFiles = new ArrayList<String>();
+      for (int i = 0; i < size; i++) {
+        _directoryFiles.add(readString(in));
+      }
     }
 
     private void writeString(DataOutput out, String s) throws IOException {
@@ -369,11 +388,12 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
     putSnapshotForTable(job.getConfiguration(), tableName, snapshot);
   }
 
-  public static Directory getDirectory(Configuration configuration, String table, Path shardDir)
throws IOException {
+  public static Directory getDirectory(Configuration configuration, String table, Path shardDir,
List<String> files)
+      throws IOException {
     Path fastPath = DirectoryUtil.getFastDirectoryPath(shardDir);
     FileSystem fileSystem = shardDir.getFileSystem(configuration);
     boolean disableFast = !fileSystem.exists(fastPath);
-    return DirectoryUtil.getDirectory(configuration, new HdfsDirectory(configuration, shardDir),
disableFast, null,
-        table, shardDir.getName(), true);
+    HdfsDirectory directory = new HdfsDirectory(configuration, shardDir, null, files);
+    return DirectoryUtil.getDirectory(configuration, directory, disableFast, null, table,
shardDir.getName(), true);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0092462c/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
index 4b9b526..af5ce9f 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
@@ -18,6 +18,7 @@ package org.apache.blur.mapreduce.lib;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -66,8 +67,9 @@ public class GenericRecordReader {
     _setup = true;
     _table = blurInputSplit.getTable();
     Path localCachePath = BlurInputFormat.getLocalCachePath(configuration);
+    List<String> files = blurInputSplit.getDirectoryFiles();
     LOG.info("Local cache path [{0}]", localCachePath);
-    _directory = BlurInputFormat.getDirectory(configuration, _table.toString(), blurInputSplit.getDir());
+    _directory = BlurInputFormat.getDirectory(configuration, _table.toString(), blurInputSplit.getDir(),
files);
 
     SegmentInfos segmentInfos = new SegmentInfos();
     segmentInfos.read(_directory, blurInputSplit.getSegmentsName());
@@ -100,7 +102,7 @@ public class GenericRecordReader {
       Path localCachePath, Collection<String> files) throws IOException {
     LOG.info("Copying files need to local cache for faster reads [{0}].", shardDir);
     Path localShardPath = new Path(new Path(localCachePath, table), shardDir.getName());
-    HdfsDirectory localDir = new HdfsDirectory(configuration, localShardPath);
+    HdfsDirectory localDir = new HdfsDirectory(configuration, localShardPath, null, files);
     for (String name : files) {
       if (!isValidFileToCache(name)) {
         continue;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0092462c/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 948bc30..930e2a2 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -190,8 +190,14 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       } else {
         for (String file : filesToExpose) {
           Path filePath = getPath(file);
-          FileStatus fileStatus = _fileSystem.getFileStatus(filePath);
-          addToCache(fileStatus);
+          try {
+            FileStatus fileStatus = _fileSystem.getFileStatus(filePath);
+            if (fileStatus != null) {
+              addToCache(fileStatus);
+            }
+          } catch (FileNotFoundException e) {
+            // Normal hdfs behavior
+          }
         }
       }
     }


Mime
View raw message