incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding a file manifest cache in the hdfs directory to greatly increase opening speed of the directory. If manifest is not present it will be rebuilt.
Date Wed, 12 Aug 2015 13:29:15 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 13b22b2f5 -> d190c10cd


Adding a file manifest cache in the hdfs directory to greatly increase opening speed of the
directory.  If manifest is not present it will be rebuilt.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/d190c10c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/d190c10c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/d190c10c

Branch: refs/heads/master
Commit: d190c10cd2762b04c4bca2ed62fbc664ba951108
Parents: 13b22b2
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Aug 12 09:28:55 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Aug 12 09:28:55 2015 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/BlurInputFormat.java     |  21 +-
 .../blur/mapreduce/lib/GenericRecordReader.java |   6 +-
 .../blur/mapreduce/lib/update/Driver.java       |   1 +
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 193 ++++++++++++++++---
 .../HdfsDirectoryManifestFileCacheTest.java     | 173 +++++++++++++++++
 .../store/hdfs/HdfsDirectoryResourceTest.java   |   2 +-
 6 files changed, 341 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d190c10c/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
index f52842a..c05d84b 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
@@ -161,14 +161,6 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
     return list;
   }
 
-  private static BlurArray toBlurArrayFromStringList(List<String> list) {
-    BlurArray array = new BlurArray();
-    for (String s : list) {
-      array.put(s);
-    }
-    return array;
-  }
-
   private static BlurObject toBlurObject(BlurInputSplit inputSplit) throws IOException {
     BlurObject blurObject = new BlurObject();
     blurObject.put("dir", inputSplit.getDir().toString());
@@ -176,7 +168,6 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
     blurObject.put("segmentInfoName", inputSplit.getSegmentInfoName());
     blurObject.put("fileLength", inputSplit.getLength());
     blurObject.put("table", inputSplit.getTable().toString());
-    blurObject.put("directoryFiles", toBlurArrayFromStringList(inputSplit.getDirectoryFiles()));
     return blurObject;
   }
 
@@ -378,7 +369,7 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
   private static List<BlurInputSplit> getSegmentSplits(Path shardDir, Configuration
configuration, Text table,
       Text snapshot) throws IOException {
     final long start = System.nanoTime();
-    Directory directory = getDirectory(configuration, table.toString(), shardDir, null);
+    Directory directory = getDirectory(configuration, table.toString(), shardDir);
     try {
       return getSplitForDirectory(shardDir, configuration, table, snapshot, directory);
     } finally {
@@ -500,11 +491,6 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
       _segmentInfoName = segmentInfoName;
       _table = table;
       _dir = dir;
-      _directoryFiles = directoryFiles;
-    }
-
-    public List<String> getDirectoryFiles() {
-      return _directoryFiles;
     }
 
     @Override
@@ -602,12 +588,11 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
     putSnapshotForTable(job.getConfiguration(), tableName, snapshot);
   }
 
-  public static Directory getDirectory(Configuration configuration, String table, Path shardDir,
List<String> files)
-      throws IOException {
+  public static Directory getDirectory(Configuration configuration, String table, Path shardDir)
throws IOException {
     Path fastPath = DirectoryUtil.getFastDirectoryPath(shardDir);
     FileSystem fileSystem = shardDir.getFileSystem(configuration);
     boolean disableFast = !fileSystem.exists(fastPath);
-    HdfsDirectory directory = new HdfsDirectory(configuration, shardDir, null, files);
+    HdfsDirectory directory = new HdfsDirectory(configuration, shardDir, null);
     return DirectoryUtil.getDirectory(configuration, directory, disableFast, null, table,
shardDir.getName(), true);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d190c10c/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
index 1335a73..4ebac47 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
@@ -18,7 +18,6 @@ package org.apache.blur.mapreduce.lib;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -72,9 +71,8 @@ public class GenericRecordReader {
     _setup = true;
     _table = blurInputSplit.getTable();
     Path localCachePath = BlurInputFormat.getLocalCachePath(configuration);
-    List<String> files = blurInputSplit.getDirectoryFiles();
     LOG.info("Local cache path [{0}]", localCachePath);
-    _directory = BlurInputFormat.getDirectory(configuration, _table.toString(), blurInputSplit.getDir(),
files);
+    _directory = BlurInputFormat.getDirectory(configuration, _table.toString(), blurInputSplit.getDir());
 
     SegmentInfoPerCommit commit = segmentInfosRead(_directory, blurInputSplit.getSegmentsName(),
         blurInputSplit.getSegmentInfoName());
@@ -150,7 +148,7 @@ public class GenericRecordReader {
       Path localCachePath, Collection<String> files) throws IOException {
     LOG.info("Copying files need to local cache for faster reads [{0}].", shardDir);
     Path localShardPath = new Path(new Path(localCachePath, table), shardDir.getName());
-    HdfsDirectory localDir = new HdfsDirectory(configuration, localShardPath, null, files);
+    HdfsDirectory localDir = new HdfsDirectory(configuration, localShardPath, null);
     for (String name : files) {
       if (!isValidFileToCache(name)) {
         continue;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d190c10c/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java
index 04cd81e..10cdcab 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java
@@ -65,6 +65,7 @@ public class Driver extends Configured implements Tool {
     if (args.length < 5) {
       System.err
           .println("Usage Driver <table> <mr inc working path> <output path>
<zk connection> <reducer multipler> <extra config files...>");
+      return 1;
     }
     String table = args[c++];
     String mrIncWorkingPathStr = args[c++];

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d190c10c/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 4df1c90..116cd72 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.Timer;
@@ -39,6 +40,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.log.Log;
@@ -109,7 +113,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   protected final Path _path;
   protected final FileSystem _fileSystem;
   protected final MetricsGroup _metricsGroup;
-  protected final Map<String, FStat> _fileStatusMap = new ConcurrentHashMap<String,
FStat>();
+  protected final FStatusCache _fileStatusCache;
   protected final Map<String, Boolean> _symlinkMap = new ConcurrentHashMap<String,
Boolean>();
   protected final Map<String, Path> _symlinkPathMap = new ConcurrentHashMap<String,
Path>();
   protected final Map<String, Boolean> _copyFileMap = new ConcurrentHashMap<String,
Boolean>();
@@ -119,22 +123,157 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   protected final SequentialReadControl _sequentialReadControl;
   protected final boolean _resourceTracking;
 
+  static class FStatusCache {
+
+    final Map<String, FStat> _cache = new ConcurrentHashMap<String, FStat>();
+    final Path _path;
+    final FileSystem _fileSystem;
+    final Path _newManifest;
+    final Path _manifest;
+    final WriteLock _writeLock;
+    final ReadLock _readLock;
+    final Path _newManifestTmp;
+
+    public FStatusCache(FileSystem fileSystem, Path path) {
+      _fileSystem = fileSystem;
+      _path = path;
+      _newManifest = new Path(_path, "file_manifest.new");
+      _newManifestTmp = new Path(_path, "file_manifest.tmp");
+      _manifest = new Path(_path, "file_manifest");
+      ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+      _writeLock = lock.writeLock();
+      _readLock = lock.readLock();
+    }
+
+    public void putFStat(String name, FStat fStat) throws IOException {
+      _writeLock.lock();
+      try {
+        _cache.put(name, fStat);
+        syncFileCache();
+      } finally {
+        _writeLock.unlock();
+      }
+    }
+
+    public void removeFStat(String name) throws IOException {
+      _writeLock.lock();
+      try {
+        _cache.remove(name);
+        syncFileCache();
+      } finally {
+        _writeLock.unlock();
+      }
+    }
+
+    public Set<String> getNames() {
+      _readLock.lock();
+      try {
+        return new HashSet<String>(_cache.keySet());
+      } finally {
+        _readLock.unlock();
+      }
+    }
+
+    public boolean containsFile(String name) {
+      _readLock.lock();
+      try {
+        return _cache.containsKey(name);
+      } finally {
+        _readLock.unlock();
+      }
+    }
+
+    public FStat getFStat(String name) {
+      _readLock.lock();
+      try {
+        return _cache.get(name);
+      } finally {
+        _readLock.unlock();
+      }
+    }
+
+    public boolean loadCacheFromManifest() throws IOException {
+      // Check file_manifest.new first, if is doesn't check file_manifest, if it
+      // doesn't exist can't load cache.
+      if (_fileSystem.exists(_newManifest)) {
+        loadCacheFromManifest(_newManifest);
+        return true;
+      } else if (_fileSystem.exists(_manifest)) {
+        loadCacheFromManifest(_manifest);
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    private void syncFileCache() throws IOException {
+      FSDataOutputStream outputStream = _fileSystem.create(_newManifestTmp, true);
+      writeFileCache(outputStream);
+      outputStream.close();
+      _fileSystem.delete(_newManifest, false);
+      if (_fileSystem.rename(_newManifestTmp, _newManifest)) {
+        _fileSystem.delete(_manifest, false);
+        if (_fileSystem.rename(_newManifest, _manifest)) {
+          LOG.info("Manifest sync complete for [{0}]", _manifest);
+        } else {
+          throw new IOException("Could not rename [" + _newManifest + "] to [" + _manifest
+ "]");
+        }
+      } else {
+        throw new IOException("Could not rename [" + _newManifestTmp + "] to [" + _newManifest
+ "]");
+      }
+    }
+
+    private void writeFileCache(FSDataOutputStream outputStream) throws IOException {
+      Set<Entry<String, FStat>> entrySet = _cache.entrySet();
+      outputStream.writeInt(_cache.size());
+      for (Entry<String, FStat> e : entrySet) {
+        String name = e.getKey();
+        FStat fstat = e.getValue();
+        writeString(outputStream, name);
+        outputStream.writeLong(fstat._lastMod);
+        outputStream.writeLong(fstat._length);
+      }
+    }
+
+    private void loadCacheFromManifest(Path manifest) throws IOException {
+      FSDataInputStream inputStream = _fileSystem.open(manifest);
+      int count = inputStream.readInt();
+      for (int i = 0; i < count; i++) {
+        String name = readString(inputStream);
+        long lastMod = inputStream.readLong();
+        long length = inputStream.readLong();
+        FStat fstat = new FStat(lastMod, length);
+        _cache.put(name, fstat);
+      }
+      inputStream.close();
+    }
+
+    private String readString(FSDataInputStream inputStream) throws IOException {
+      int length = inputStream.readInt();
+      byte[] buf = new byte[length];
+      inputStream.readFully(buf);
+      return new String(buf, UTF_8);
+    }
+
+    private void writeString(FSDataOutputStream outputStream, String s) throws IOException
{
+      byte[] bs = s.getBytes(UTF_8);
+      outputStream.writeInt(bs.length);
+      outputStream.write(bs);
+    }
+
+  }
+
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
     this(configuration, path, new SequentialReadControl(new BlurConfiguration()));
   }
 
   public HdfsDirectory(Configuration configuration, Path path, SequentialReadControl sequentialReadControl)
       throws IOException {
-    this(configuration, path, sequentialReadControl, null);
-  }
-
-  public HdfsDirectory(Configuration configuration, Path path, SequentialReadControl sequentialReadControl,
-      Collection<String> filesToExpose) throws IOException {
-    this(configuration, path, sequentialReadControl, filesToExpose, false);
+    this(configuration, path, sequentialReadControl, false);
   }
 
   public HdfsDirectory(Configuration configuration, Path path, SequentialReadControl sequentialReadControl,
-      Collection<String> filesToExpose, boolean resourceTracking) throws IOException
{
+      boolean resourceTracking) throws IOException {
     _resourceTracking = resourceTracking;
     if (sequentialReadControl == null) {
       _sequentialReadControl = new SequentialReadControl(new BlurConfiguration());
@@ -163,25 +302,15 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
 
     if (_useCache) {
-      if (filesToExpose == null) {
+      _fileStatusCache = new FStatusCache(_fileSystem, _path);
+      if (!_fileStatusCache.loadCacheFromManifest()) {
         FileStatus[] listStatus = _fileSystem.listStatus(_path);
         for (FileStatus fileStatus : listStatus) {
           addToCache(fileStatus);
         }
-      } else {
-        for (String file : filesToExpose) {
-          Path filePath = getPathOrSymlinkForDelete(file);
-          try {
-            FileStatus fileStatus = _fileSystem.getFileStatus(filePath);
-            if (fileStatus != null) {
-              addToCache(fileStatus);
-            }
-          } catch (FileNotFoundException e) {
-            // Normal hdfs behavior
-            LOG.info("Lucene file [{0}] path [{1}] was not found.", file, filePath);
-          }
-        }
       }
+    } else {
+      _fileStatusCache = null;
     }
   }
 
@@ -202,7 +331,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
         lastMod = fileStatus.getModificationTime();
       }
       length = length(resolvedName);
-      _fileStatusMap.put(resolvedName, new FStat(lastMod, length));
+      _fileStatusCache.putFStat(resolvedName, new FStat(lastMod, length));
     }
   }
 
@@ -267,7 +396,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       deleteFile(name);
     }
     if (_useCache) {
-      _fileStatusMap.put(name, new FStat(System.currentTimeMillis(), 0L));
+      _fileStatusCache.putFStat(name, new FStat(System.currentTimeMillis(), 0L));
     }
     final FSDataOutputStream outputStream = openForOutput(name);
     trackObject(outputStream, "Outputstream", name, _path);
@@ -292,7 +421,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
         super.close();
         long length = outputStream.getPos();
         if (_useCache) {
-          _fileStatusMap.put(name, new FStat(System.currentTimeMillis(), length));
+          _fileStatusCache.putFStat(name, new FStat(System.currentTimeMillis(), length));
         }
         // This exists because HDFS is so slow to close files. There are
         // built-in sleeps during the close call.
@@ -346,7 +475,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     LOG.debug("listAll [{0}]", getPath());
 
     if (_useCache) {
-      Set<String> names = new HashSet<String>(_fileStatusMap.keySet());
+      Set<String> names = _fileStatusCache.getNames();
       return names.toArray(new String[names.size()]);
     }
 
@@ -381,7 +510,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   public boolean fileExists(String name) throws IOException {
     LOG.debug("fileExists [{0}] [{1}]", name, getPath());
     if (_useCache) {
-      return _fileStatusMap.containsKey(name);
+      return _fileStatusCache.containsFile(name);
     }
     return exists(name);
   }
@@ -401,7 +530,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     LOG.debug("deleteFile [{0}] [{1}]", name, getPath());
     if (fileExists(name)) {
       if (_useCache) {
-        _fileStatusMap.remove(name);
+        _fileStatusCache.removeFStat(name);
       }
       delete(name);
     } else {
@@ -427,7 +556,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   public long fileLength(String name) throws IOException {
     LOG.debug("fileLength [{0}] [{1}]", name, getPath());
     if (_useCache) {
-      FStat fStat = _fileStatusMap.get(name);
+      FStat fStat = _fileStatusCache.getFStat(name);
       if (fStat == null) {
         throw new FileNotFoundException(name);
       }
@@ -545,7 +674,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
 
   public long getFileModified(String name) throws IOException {
     if (_useCache) {
-      FStat fStat = _fileStatusMap.get(name);
+      FStat fStat = _fileStatusCache.getFStat(name);
       if (fStat == null) {
         throw new FileNotFoundException("File [" + name + "] not found");
       }
@@ -560,7 +689,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     try {
       FileStatus fileStatus = _fileSystem.getFileStatus(path);
       if (_useCache) {
-        _fileStatusMap.put(name, new FStat(fileStatus));
+        _fileStatusCache.putFStat(name, new FStat(fileStatus));
       }
       return fileStatus.getModificationTime();
     } finally {
@@ -592,7 +721,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     outputStream.write(srcPath.toString().getBytes(UTF_8));
     outputStream.close();
     if (_useCache) {
-      to._fileStatusMap.put(dest, _fileStatusMap.get(src));
+      to._fileStatusCache.putFStat(dest, _fileStatusCache.getFStat(src));
     }
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d190c10c/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryManifestFileCacheTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryManifestFileCacheTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryManifestFileCacheTest.java
new file mode 100644
index 0000000..aa5cad5
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryManifestFileCacheTest.java
@@ -0,0 +1,173 @@
+package org.apache.blur.store.hdfs;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class HdfsDirectoryManifestFileCacheTest {
+
+  private Configuration _configuration = new Configuration();
+  private Path _root;
+  private FileSystem _fileSystem;
+
+  @Before
+  public void setup() throws IOException {
+    _root = new Path(new File("target/tmp/HdfsDirectoryFileCacheTest").getAbsolutePath());
+    _fileSystem = _root.getFileSystem(_configuration);
+    _fileSystem.delete(_root, true);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    _fileSystem.delete(_root, true);
+  }
+
+  @Test
+  public void test1() throws IOException {
+    Path path = new Path(_root, "dir1");
+    HdfsDirectory dir = new HdfsDirectory(_configuration, path);
+    createFiles(_configuration, 10, 10, path, dir);
+    dir.close();
+
+    _fileSystem.delete(new Path(path, "file_manifest"), false);
+    long t1 = System.nanoTime();
+    // Rebuilds manifest
+    new HdfsDirectory(_configuration, path).close();
+    long t2 = System.nanoTime();
+    // Uses manifest
+    new HdfsDirectory(_configuration, path).close();
+    long t3 = System.nanoTime();
+    System.out.println("No manifest [" + (t2 - t1) / 1000000.0 + "]");
+    System.out.println("With manifest [" + (t3 - t2) / 1000000.0 + "]");
+  }
+
+  @Test
+  public void test2() throws IOException {
+    Path path = new Path(_root, "dir2");
+    {
+      HdfsDirectory dir1 = new HdfsDirectory(_configuration, path);
+      createFile(dir1, "file1");
+      createFile(dir1, "file2");
+      createFile(dir1, "file3");
+      dir1.close();
+    }
+
+    HdfsDirectory dir2 = new HdfsDirectory(_configuration, path);
+    String[] listAll = dir2.listAll();
+    assertEquals(3, listAll.length);
+    assertEquals(new HashSet<String>(Arrays.asList("file1", "file2", "file3")),
+        new HashSet<String>(Arrays.asList(listAll)));
+    for (String f : listAll) {
+      assertEquals(4, dir2.fileLength(f));
+    }
+    dir2.close();
+  }
+
+  @Test
+  public void test3() throws IOException {
+    Path path = new Path(_root, "dir3");
+    {
+      HdfsDirectory dir1 = new HdfsDirectory(_configuration, path);
+      createFile(dir1, "file1");
+      createFile(dir1, "file2");
+      createFile(dir1, "file3");
+      dir1.deleteFile("file3");
+      dir1.close();
+    }
+
+    HdfsDirectory dir2 = new HdfsDirectory(_configuration, path);
+    String[] listAll = dir2.listAll();
+    assertEquals(2, listAll.length);
+    assertEquals(new HashSet<String>(Arrays.asList("file1", "file2")), new HashSet<String>(Arrays.asList(listAll)));
+    for (String f : listAll) {
+      assertEquals(4, dir2.fileLength(f));
+    }
+    dir2.close();
+  }
+
+  @Test
+  public void test4() throws IOException {
+    Path path = new Path(_root, "dir4");
+    {
+      HdfsDirectory dir1 = new HdfsDirectory(_configuration, path);
+      createFile(dir1, "file1");
+      createFile(dir1, "file2");
+      createFile(dir1, "file3");
+      IndexOutput output = dir1.createOutput("file2", IOContext.DEFAULT);
+      output.writeLong(1L);
+      output.close();
+      dir1.close();
+    }
+
+    HdfsDirectory dir2 = new HdfsDirectory(_configuration, path);
+    String[] listAll = dir2.listAll();
+    assertEquals(3, listAll.length);
+    assertEquals(new HashSet<String>(Arrays.asList("file1", "file2", "file3")),
+        new HashSet<String>(Arrays.asList(listAll)));
+    for (String f : listAll) {
+      if (f.equals("file2")) {
+        assertEquals(8, dir2.fileLength(f));
+      } else {
+        assertEquals(4, dir2.fileLength(f));
+      }
+    }
+    dir2.close();
+  }
+
+  private void createFile(HdfsDirectory dir, String name) throws IOException {
+    IndexOutput output = dir.createOutput(name, IOContext.DEFAULT);
+    output.writeInt(1);
+    output.close();
+  }
+
+  private void createFiles(Configuration configuration, int numberOfDirs, int numberOfFiles,
Path path,
+      HdfsDirectory mainDir) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    for (int d = 0; d < numberOfDirs; d++) {
+      Path dir = new Path(path, "dir." + d);
+      fileSystem.mkdirs(dir);
+      for (int f = 0; f < numberOfFiles; f++) {
+        Path p = new Path(dir, "file." + f);
+        FSDataOutputStream outputStream = fileSystem.create(p);
+        outputStream.write(1);
+        outputStream.close();
+      }
+      HdfsDirectory subDir = new HdfsDirectory(configuration, dir);
+      for (String file : subDir.listAll()) {
+        subDir.copy(mainDir, file, UUID.randomUUID().toString(), IOContext.READ);
+      }
+      subDir.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d190c10c/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
index 7496306..63f1210 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
@@ -69,7 +69,7 @@ public class HdfsDirectoryResourceTest {
   public void testResourceTracking() throws IOException, InterruptedException {
     Path path = new Path(_root, "testResourceTracking");
     boolean resourceTracking = true;
-    HdfsDirectory dir = new HdfsDirectory(_configuration, path, null, null, resourceTracking);
+    HdfsDirectory dir = new HdfsDirectory(_configuration, path, null, resourceTracking);
     try {
       String name = "_1.file";
       executeWrites(dir, name);


Mime
View raw message