incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Added symlink like data structure to the HdfsDirectory. This will prevent data loss for MapReduce outputs in the event that the index commit do not succeed. Also once the link files are no longer referenced the directory will be removed. A
Date Mon, 03 Mar 2014 21:24:41 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/apache-blur-0.2 36c4adea3 -> bfb49c739


Added symlink like data structure to the HdfsDirectory.  This will prevent data loss for MapReduce outputs in the event that the index commit do not succeed.  Also once the link files are no longer referenced the directory will be removed.  All directories that are inuse by the main directory through the use of symlinks will be named with a suffix of '.inuse'.  If these directories are removed, the index will become currupt.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/bfb49c73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/bfb49c73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/bfb49c73

Branch: refs/heads/apache-blur-0.2
Commit: bfb49c7393c9ff6fcf5808a433d624458a4906b0
Parents: 36c4ade
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Mar 3 16:24:36 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Mar 3 16:24:36 2014 -0500

----------------------------------------------------------------------
 .../blur/manager/writer/IndexImporter.java      | 150 ++++++++++----
 .../blur/utils/TableShardCountCollapser.java    |   5 +-
 .../blur/manager/writer/IndexImporterTest.java  |  79 ++++++--
 .../lib/BlurOutputFormatMiniClusterTest.java    |   2 +-
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 142 +++++++++----
 .../apache/blur/store/hdfs/HdfsQuickMove.java   |  23 ---
 .../org/apache/blur/store/hdfs/HdfsSymlink.java |  23 +++
 .../blur/store/hdfs/SoftlinkHdfsDirectory.java  | 201 -------------------
 .../hdfs_v2/FastHdfsKeyValueDirectory.java      |  21 +-
 .../blur/store/hdfs_v2/JoinDirectory.java       |   6 +-
 .../store/SoftlinkHdfsDirectoryTestSuite.java   |  49 -----
 .../store/hdfs/HdfsDirectorySymlinkTest.java    | 124 ++++++++++++
 .../hdfs_v2/FastHdfsKeyValueDirectoryTest.java  |   2 +-
 13 files changed, 434 insertions(+), 393 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
index d8e9c44..2600725 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -19,11 +19,13 @@ package org.apache.blur.manager.writer;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,6 +35,7 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.BlurPartitioner;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
 import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
@@ -55,6 +58,12 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BytesRef;
 
 public class IndexImporter extends TimerTask implements Closeable {
+
+  private static final String BADROWIDS = ".badrowids";
+  private static final String COMMIT = ".commit";
+  private static final String INUSE = ".inuse";
+  private static final String BADINDEX = ".badindex";
+
   private final static Log LOG = LogFactory.getLog(IndexImporter.class);
 
   private final BlurIndex _blurIndex;
@@ -64,6 +73,9 @@ public class IndexImporter extends TimerTask implements Closeable {
   private final String _shard;
   private final AtomicBoolean _running = new AtomicBoolean();
 
+  private long _lastCleanup;
+  private final long _cleanupDelay;
+
   public IndexImporter(BlurIndex blurIndex, ShardContext shardContext, TimeUnit refreshUnit, long refreshAmount) {
     _running.set(true);
     _blurIndex = blurIndex;
@@ -74,6 +86,7 @@ public class IndexImporter extends TimerTask implements Closeable {
     _timer.schedule(this, period, period);
     _table = _shardContext.getTableContext().getTable();
     _shard = _shardContext.getShard();
+    _cleanupDelay = TimeUnit.MINUTES.toMillis(10);
   }
 
   @Override
@@ -87,6 +100,14 @@ public class IndexImporter extends TimerTask implements Closeable {
 
   @Override
   public void run() {
+    if (_lastCleanup + _cleanupDelay < System.currentTimeMillis()) {
+      try {
+        cleanupOldDirs();
+      } catch (IOException e) {
+        LOG.error("Unknown error while trying to clean old directories on [{1}/{2}].", e, _shard, _table);
+      }
+      _lastCleanup = System.currentTimeMillis();
+    }
     Path path = _shardContext.getHdfsDirPath();
     Configuration configuration = _shardContext.getTableContext().getConfiguration();
     try {
@@ -100,7 +121,7 @@ public class IndexImporter extends TimerTask implements Closeable {
           listStatus = sort(fileSystem.listStatus(path, new PathFilter() {
             @Override
             public boolean accept(Path path) {
-              if (path != null && path.getName().endsWith(".commit")) {
+              if (path != null && path.getName().endsWith(COMMIT)) {
                 return true;
               }
               return false;
@@ -116,50 +137,53 @@ public class IndexImporter extends TimerTask implements Closeable {
           return;
         }
       }
-      List<HdfsDirectory> indexesToImport = new ArrayList<HdfsDirectory>();
       for (FileStatus fileStatus : listStatus) {
         Path file = fileStatus.getPath();
-        if (fileStatus.isDir() && file.getName().endsWith(".commit")) {
-          HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, file);
-          if (!DirectoryReader.indexExists(hdfsDirectory)) {
-            LOG.error("Directory found at [{0}] is not a vaild index.", file);
+        if (fileStatus.isDir() && file.getName().endsWith(COMMIT)) {
+          // rename to inuse, if good continue else rename to badindex
+          Path inuse = new Path(file.getParent(), rename(file.getName(), INUSE));
+          if (fileSystem.rename(file, inuse)) {
+            HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, inuse);
+            if (DirectoryReader.indexExists(hdfsDirectory)) {
+              IndexAction indexAction = getIndexAction(hdfsDirectory, fileSystem);
+              _blurIndex.process(indexAction);
+              return;
+            } else {
+              Path badindex = new Path(file.getParent(), rename(file.getName(), BADINDEX));
+              if (fileSystem.rename(inuse, badindex)) {
+                LOG.error("Directory found at [{0}] is not a vaild index, renaming to [{1}].", inuse, badindex);
+              } else {
+                LOG.fatal("Directory found at [{0}] is not a vaild index, could not rename to [{1}].", inuse, badindex);
+              }
+            }
           } else {
-            indexesToImport.add(hdfsDirectory);
+            LOG.fatal("Could not rename [{0}] to inuse dir.", file);
           }
         }
       }
-      if (indexesToImport.isEmpty()) {
-        return;
-      }
-
-      IndexAction indexAction = getIndexAction(indexesToImport, fileSystem);
-      _blurIndex.process(indexAction);
     } catch (IOException e) {
       LOG.error("Unknown error while trying to refresh imports on [{1}/{2}].", e, _shard, _table);
     }
   }
 
-  private IndexAction getIndexAction(final List<HdfsDirectory> indexesToImport, final FileSystem fileSystem) {
-    return new IndexAction() {
+  private String rename(String name, String newSuffix) {
+    int lastIndexOf = name.lastIndexOf('.');
+    return name.substring(0, lastIndexOf) + newSuffix;
+  }
 
-      private Path _dirPath;
+  private IndexAction getIndexAction(final HdfsDirectory directory, final FileSystem fileSystem) {
+    return new IndexAction() {
 
       @Override
       public void performMutate(IndexSearcherClosable searcher, IndexWriter writer) throws IOException {
-        for (Directory directory : indexesToImport) {
-          LOG.info("About to import [{0}] into [{1}/{2}]", directory, _shard, _table);
-        }
-        LOG.info("Obtaining lock on [{0}/{1}]", _shard, _table);
-        for (HdfsDirectory directory : indexesToImport) {
-          boolean emitDeletes = searcher.getIndexReader().numDocs() != 0;
-          _dirPath = directory.getPath();
-          applyDeletes(directory, writer, _shard, emitDeletes);
-          LOG.info("Add index [{0}] [{1}/{2}]", directory, _shard, _table);
-          writer.addIndexes(directory);
-          LOG.info("Removing delete markers [{0}] on [{1}/{2}]", directory, _shard, _table);
-          writer.deleteDocuments(new Term(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE));
-          LOG.info("Finishing import [{0}], commiting on [{1}/{2}]", directory, _shard, _table);
-        }
+        LOG.info("About to import [{0}] into [{1}/{2}]", directory, _shard, _table);
+        boolean emitDeletes = searcher.getIndexReader().numDocs() != 0;
+        applyDeletes(directory, writer, _shard, emitDeletes);
+        LOG.info("Add index [{0}] [{1}/{2}]", directory, _shard, _table);
+        writer.addIndexes(directory);
+        LOG.info("Removing delete markers [{0}] on [{1}/{2}]", directory, _shard, _table);
+        writer.deleteDocuments(new Term(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE));
+        LOG.info("Finishing import [{0}], commiting on [{1}/{2}]", directory, _shard, _table);
       }
 
       @Override
@@ -169,10 +193,6 @@ public class IndexImporter extends TimerTask implements Closeable {
 
       @Override
       public void doPostCommit(IndexWriter writer) throws IOException {
-        LOG.info("Calling maybeMerge on the index [{0}] for [{1}/{2}]", _dirPath, _shard, _table);
-        writer.maybeMerge();
-        LOG.info("Cleaning up old directory [{0}] for [{1}/{2}]", _dirPath, _shard, _table);
-        fileSystem.delete(_dirPath, true);
         LOG.info("Import complete on [{0}/{1}]", _shard, _table);
       }
 
@@ -184,10 +204,9 @@ public class IndexImporter extends TimerTask implements Closeable {
       @Override
       public void doPostRollback(IndexWriter writer) throws IOException {
         LOG.info("Finished rollback on [{0}/{1}]", _shard, _table);
-        String name = _dirPath.getName();
-        int lastIndexOf = name.lastIndexOf('.');
-        String badRowIdsName = name.substring(0, lastIndexOf) + ".bad_rowids";
-        fileSystem.rename(_dirPath, new Path(_dirPath.getParent(), badRowIdsName));
+        Path path = directory.getPath();
+        String name = path.getName();
+        fileSystem.rename(path, new Path(path.getParent(), rename(name, BADROWIDS)));
       }
     };
   }
@@ -235,4 +254,57 @@ public class IndexImporter extends TimerTask implements Closeable {
       reader.close();
     }
   }
+
+  public void cleanupOldDirs() throws IOException {
+    Path hdfsDirPath = _shardContext.getHdfsDirPath();
+    TableContext tableContext = _shardContext.getTableContext();
+    Configuration configuration = tableContext.getConfiguration();
+    FileSystem fileSystem = hdfsDirPath.getFileSystem(configuration);
+    FileStatus[] inuseSubDirs = fileSystem.listStatus(hdfsDirPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(INUSE);
+      }
+    });
+    Set<Path> inuseDirs = toSet(inuseSubDirs);
+    Map<Path, Path> inuseFileToDir = toMap(fileSystem, inuseDirs);
+    FileStatus[] listStatus = fileSystem.listStatus(hdfsDirPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(HdfsDirectory.LNK);
+      }
+    });
+
+    for (FileStatus status : listStatus) {
+      Path realPath = HdfsDirectory.readRealPathDataFromSymlinkPath(fileSystem, status.getPath());
+      Path inuseDir = inuseFileToDir.get(realPath);
+      inuseDirs.remove(inuseDir);
+    }
+
+    for (Path p : inuseDirs) {
+      LOG.info("Deleteing path [{0}] no longer in use.", p);
+      fileSystem.delete(p, true);
+    }
+  }
+
+  private Map<Path, Path> toMap(FileSystem fileSystem, Set<Path> inuseDirs) throws IOException {
+    Map<Path, Path> result = new TreeMap<Path, Path>();
+    for (Path p : inuseDirs) {
+      if (!fileSystem.isFile(p)) {
+        FileStatus[] listStatus = fileSystem.listStatus(p);
+        for (FileStatus status : listStatus) {
+          result.put(status.getPath(), p);
+        }
+      }
+    }
+    return result;
+  }
+
+  private Set<Path> toSet(FileStatus[] dirs) {
+    Set<Path> result = new TreeSet<Path>();
+    for (FileStatus status : dirs) {
+      result.add(status.getPath());
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-core/src/main/java/org/apache/blur/utils/TableShardCountCollapser.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/TableShardCountCollapser.java b/blur-core/src/main/java/org/apache/blur/utils/TableShardCountCollapser.java
index 1e91baf..e8ebd76 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/TableShardCountCollapser.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/TableShardCountCollapser.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-import org.apache.blur.lucene.codec.Blur021Codec;
 import org.apache.blur.lucene.codec.Blur022Codec;
 import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +37,7 @@ import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.Directory;
+
 /**
  * This class is used to reduce the total number of shards of a table. The main
  * use would be if during an indexing job the number of reducers were increased
@@ -143,6 +143,9 @@ public class TableShardCountCollapser extends Configured implements Tool {
         pathsToDelete[p - 1] = pathToMerge;
       }
       indexWriter.addIndexes(dirs);
+      // Causes rewrite of of index and the symlinked files are
+      // merged/rewritten.
+      indexWriter.forceMerge(1);
       indexWriter.close();
       FileSystem fileSystem = path.getFileSystem(getConf());
       for (Path p : pathsToDelete) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
index 407788a..5e11ac3 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -46,7 +46,7 @@ import org.apache.lucene.document.Field;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.store.Directory;
 import org.junit.After;
 import org.junit.Before;
@@ -57,7 +57,7 @@ public class IndexImporterTest {
   private static final Path TMPDIR = new Path("target/tmp");
 
   private Path _base;
-  private Configuration configuration;
+  private Configuration _configuration;
   private IndexWriter _commitWriter;
   private IndexImporter _indexImporter;
   private Random _random = new Random();
@@ -65,18 +65,21 @@ public class IndexImporterTest {
   private Path _badRowIdsPath;
   private IndexWriter _mainWriter;
   private FileSystem _fileSystem;
-
   private FieldManager _fieldManager;
+  private Path _badIndexPath;
+  private Path _inUsePath;
+  private Path _shardPath;
+  private HdfsDirectory _mainDirectory;
 
   @Before
   public void setup() throws IOException {
     TableContext.clear();
-    configuration = new Configuration();
+    _configuration = new Configuration();
     _base = new Path(TMPDIR, "blur-index-importer-test");
-    _fileSystem = _base.getFileSystem(configuration);
+    _fileSystem = _base.getFileSystem(_configuration);
     _fileSystem.delete(_base, true);
     _fileSystem.mkdirs(_base);
-    setupWriter(configuration);
+    setupWriter(_configuration);
   }
 
   private void setupWriter(Configuration configuration) throws IOException {
@@ -90,25 +93,29 @@ public class IndexImporterTest {
     TableContext tableContext = TableContext.create(tableDescriptor);
     ShardContext shardContext = ShardContext.create(tableContext, "shard-00000000");
     Path tablePath = new Path(_base, "table-table");
-    Path shardPath = new Path(tablePath, "shard-00000000");
+    _shardPath = new Path(tablePath, "shard-00000000");
     String indexDirName = "index_" + uuid;
-    _path = new Path(shardPath, indexDirName + ".commit");
+    _path = new Path(_shardPath, indexDirName + ".commit");
     _fileSystem.mkdirs(_path);
-    _badRowIdsPath = new Path(shardPath, indexDirName + ".bad_rowids");
+    _badRowIdsPath = new Path(_shardPath, indexDirName + ".badrowids");
+    _badIndexPath = new Path(_shardPath, indexDirName + ".badindex");
+    _inUsePath = new Path(_shardPath, indexDirName + ".inuse");
     Directory commitDirectory = new HdfsDirectory(configuration, _path);
-    Directory mainDirectory = new HdfsDirectory(configuration, shardPath);
+    _mainDirectory = new HdfsDirectory(configuration, _shardPath);
     _fieldManager = tableContext.getFieldManager();
     Analyzer analyzerForIndex = _fieldManager.getAnalyzerForIndex();
     IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, analyzerForIndex);
-    conf.setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES);
+    // conf.setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES);
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
     _commitWriter = new IndexWriter(commitDirectory, conf.clone());
 
     // Make sure there's an empty index...
-    new IndexWriter(mainDirectory, conf.clone()).close();
-    _mainWriter = new IndexWriter(mainDirectory, conf.clone());
+    new IndexWriter(_mainDirectory, conf.clone()).close();
+    _mainWriter = new IndexWriter(_mainDirectory, conf.clone());
     BufferStore.initNewBuffer(128, 128 * 128);
 
-    _indexImporter = new IndexImporter(getBlurIndex(shardContext, mainDirectory), shardContext, TimeUnit.MINUTES, 10);
+    _indexImporter = new IndexImporter(getBlurIndex(shardContext, _mainDirectory), shardContext, TimeUnit.MINUTES, 10);
   }
 
   private BlurIndex getBlurIndex(ShardContext shardContext, final Directory mainDirectory) throws IOException {
@@ -188,7 +195,46 @@ public class IndexImporterTest {
     IOUtils.closeQuietly(_commitWriter);
     IOUtils.closeQuietly(_mainWriter);
     IOUtils.closeQuietly(_indexImporter);
-    _base.getFileSystem(configuration).delete(_base, true);
+    _base.getFileSystem(_configuration).delete(_base, true);
+  }
+
+  @Test
+  public void testIndexImporterWithBadIndex() throws IOException {
+    _fileSystem.delete(_path, true);
+    _fileSystem.mkdirs(_path);
+    _indexImporter.run();
+    assertFalse(_fileSystem.exists(_path));
+    assertFalse(_fileSystem.exists(_badRowIdsPath));
+    assertTrue(_fileSystem.exists(_badIndexPath));
+  }
+
+  @Test
+  public void testIndexImporterCheckInuseDirsForCleanup() throws IOException {
+    FileSystem fileSystem = _shardPath.getFileSystem(_configuration);
+
+    List<Field> document1 = _fieldManager.getFields("10", genRecord("1"));
+    _mainWriter.addDocument(document1);
+    _mainWriter.commit();
+
+    List<Field> document2 = _fieldManager.getFields("1", genRecord("1"));
+    _commitWriter.addDocument(document2);
+    _commitWriter.commit();
+    _commitWriter.close();
+    _indexImporter.run();
+
+    assertFalse(_fileSystem.exists(_path));
+    assertFalse(_fileSystem.exists(_badRowIdsPath));
+    assertTrue(_fileSystem.exists(_inUsePath));
+
+    DirectoryReader reader = DirectoryReader.open(_mainDirectory);
+    while (reader.leaves().size() > 1) {
+      reader.close();
+      _mainWriter.forceMerge(1, true);
+      _mainWriter.commit();
+      reader = DirectoryReader.open(_mainDirectory);
+    }
+    _indexImporter.cleanupOldDirs();
+    assertFalse(fileSystem.exists(_path));
   }
 
   @Test
@@ -200,11 +246,11 @@ public class IndexImporterTest {
     _indexImporter.run();
     assertFalse(_fileSystem.exists(_path));
     assertFalse(_fileSystem.exists(_badRowIdsPath));
+    assertTrue(_fileSystem.exists(_inUsePath));
   }
 
   @Test
   public void testIndexImporterWithWrongRowIdShardCombination() throws IOException {
-    setupWriter(configuration);
     List<Field> document = _fieldManager.getFields("2", genRecord("1"));
     _commitWriter.addDocument(document);
     _commitWriter.commit();
@@ -212,6 +258,7 @@ public class IndexImporterTest {
     _indexImporter.run();
     assertFalse(_fileSystem.exists(_path));
     assertTrue(_fileSystem.exists(_badRowIdsPath));
+    assertFalse(_fileSystem.exists(_inUsePath));
   }
 
   private Record genRecord(String recordId) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
index d44fed8..c14e86e 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
@@ -185,7 +185,7 @@ public class BlurOutputFormatMiniClusterTest {
 
     FileStatus[] listStatusAfter = fileSystem.listStatus(shardPath);
 
-    assertEquals(10, listStatusAfter.length);
+    assertEquals(11, listStatusAfter.length);
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 82d16d4..bdf4f31 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -22,6 +22,7 @@ import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.HashSet;
@@ -55,7 +56,11 @@ import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.core.Meter;
 import com.yammer.metrics.core.MetricName;
 
-public class HdfsDirectory extends Directory implements LastModified, HdfsQuickMove {
+public class HdfsDirectory extends Directory implements LastModified, HdfsSymlink {
+
+  public static final String LNK = ".lnk";
+
+  private static final String UTF_8 = "UTF-8";
 
   private static final Log LOG = LogFactory.getLog(HdfsDirectory.class);
 
@@ -67,13 +72,15 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsQuickM
   protected final Path _path;
   protected final FileSystem _fileSystem;
   protected final MetricsGroup _metricsGroup;
-  protected final Map<String, Long> _fileMap = new ConcurrentHashMap<String, Long>();
+  protected final Map<String, Long> _fileLengthMap = new ConcurrentHashMap<String, Long>();
+  protected final Map<String, Boolean> _symlinkMap = new ConcurrentHashMap<String, Boolean>();
+  protected final Map<String, Path> _symlinkPathMap = new ConcurrentHashMap<String, Path>();
   protected final Map<Path, FSDataInputStream> _inputMap = new ConcurrentHashMap<Path, FSDataInputStream>();
   protected final boolean _useCache = true;
 
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
-    this._path = path;
     _fileSystem = path.getFileSystem(configuration);
+    _path = _fileSystem.makeQualified(path);
     _fileSystem.mkdirs(path);
     setLockFactory(NoLockFactory.getNoLockFactory());
     synchronized (_metricsGroupMap) {
@@ -90,7 +97,7 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsQuickM
       FileStatus[] listStatus = _fileSystem.listStatus(_path);
       for (FileStatus fileStatus : listStatus) {
         if (!fileStatus.isDir()) {
-          _fileMap.put(fileStatus.getPath().getName(), fileStatus.getLen());
+          _fileLengthMap.put(fileStatus.getPath().getName(), fileStatus.getLen());
         }
       }
     }
@@ -120,7 +127,7 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsQuickM
     if (fileExists(name)) {
       throw new IOException("File [" + name + "] already exists found.");
     }
-    _fileMap.put(name, 0L);
+    _fileLengthMap.put(name, 0L);
     final FSDataOutputStream outputStream = openForOutput(name);
     return new BufferedIndexOutput() {
 
@@ -141,7 +148,7 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsQuickM
       @Override
       public void close() throws IOException {
         super.close();
-        _fileMap.put(name, outputStream.getPos());
+        _fileLengthMap.put(name, outputStream.getPos());
         outputStream.close();
         openForInput(name);
       }
@@ -195,7 +202,7 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsQuickM
     LOG.debug("listAll [{0}]", _path);
 
     if (_useCache) {
-      Set<String> names = new HashSet<String>(_fileMap.keySet());
+      Set<String> names = new HashSet<String>(_fileLengthMap.keySet());
       return names.toArray(new String[names.size()]);
     }
 
@@ -225,7 +232,7 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsQuickM
   public boolean fileExists(String name) throws IOException {
     LOG.debug("fileExists [{0}] [{1}]", name, _path);
     if (_useCache) {
-      return _fileMap.containsKey(name);
+      return _fileLengthMap.containsKey(name);
     }
     return exists(name);
   }
@@ -245,7 +252,7 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsQuickM
     LOG.debug("deleteFile [{0}] [{1}]", name, _path);
     if (fileExists(name)) {
       if (_useCache) {
-        _fileMap.remove(name);
+        _fileLengthMap.remove(name);
       }
       delete(name);
     } else {
@@ -254,12 +261,16 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsQuickM
   }
 
   protected void delete(String name) throws IOException {
-    Path path = getPath(name);
+    Path path = getPathOrSymlink(name);
     FSDataInputStream inputStream = _inputMap.remove(path);
     Tracer trace = Trace.trace("filesystem - delete", Trace.param("path", path));
     if (inputStream != null) {
       inputStream.close();
     }
+    if (_useCache) {
+      _symlinkMap.remove(name);
+      _symlinkPathMap.remove(name);
+    }
     try {
       _fileSystem.delete(path, true);
     } finally {
@@ -271,7 +282,7 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsQuickM
   public long fileLength(String name) throws IOException {
     LOG.debug("fileLength [{0}] [{1}]", name, _path);
     if (_useCache) {
-      Long length = _fileMap.get(name);
+      Long length = _fileLengthMap.get(name);
       if (length == null) {
         throw new FileNotFoundException(name);
       }
@@ -305,10 +316,71 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsQuickM
     return _path;
   }
 
-  private Path getPath(String name) {
+  private Path getPath(String name) throws IOException {
+    if (isSymlink(name)) {
+      return getRealFilePathFromSymlink(name);
+    }
     return new Path(_path, name);
   }
 
+  private Path getPathOrSymlink(String name) throws IOException {
+    if (isSymlink(name)) {
+      return new Path(_path, name + LNK);
+    }
+    return new Path(_path, name);
+  }
+
+  private Path getRealFilePathFromSymlink(String name) throws IOException {
+    // need to cache
+    if (_useCache) {
+      Path path = _symlinkPathMap.get(name);
+      if (path != null) {
+        return path;
+      }
+    }
+    Tracer trace = Trace.trace("filesystem - getRealFilePathFromSymlink", Trace.param("name", name));
+    try {
+      Path linkPath = new Path(_path, name + LNK);
+      Path path = readRealPathDataFromSymlinkPath(_fileSystem, linkPath);
+      if (_useCache) {
+        _symlinkPathMap.put(name, path);
+      }
+      return path;
+    } finally {
+      trace.done();
+    }
+  }
+
+  public static Path readRealPathDataFromSymlinkPath(FileSystem fileSystem, Path linkPath) throws IOException,
+      UnsupportedEncodingException {
+    FileStatus fileStatus = fileSystem.getFileStatus(linkPath);
+    FSDataInputStream inputStream = fileSystem.open(linkPath);
+    byte[] buf = new byte[(int) fileStatus.getLen()];
+    inputStream.readFully(buf);
+    inputStream.close();
+    Path path = new Path(new String(buf, UTF_8));
+    return path;
+  }
+
+  private boolean isSymlink(String name) throws IOException {
+    if (_useCache) {
+      Boolean b = _symlinkMap.get(name);
+      if (b != null) {
+        return b;
+      }
+    }
+    Tracer trace = Trace.trace("filesystem - isSymlink", Trace.param("name", name));
+    try {
+      boolean exists = _fileSystem.exists(new Path(_path, name + LNK));
+      if (_useCache) {
+        _symlinkMap.put(name, exists);
+      }
+      return exists;
+    } finally {
+      trace.done();
+    }
+  }
+
   public long getFileModified(String name) throws IOException {
     if (!fileExists(name)) {
       throw new FileNotFoundException("File [" + name + "] not found");
@@ -329,46 +401,38 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsQuickM
   @Override
   public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
     if (to instanceof DirectoryDecorator) {
+      // Unwrap original directory
       copy(((DirectoryDecorator) to).getOriginalDirectory(), src, dest, context);
-    } else if (to instanceof HdfsQuickMove) {
-      if (quickMove(((HdfsQuickMove) to).getQuickMoveDirectory(), src, dest, context)) {
+      return;
+    } else if (to instanceof HdfsSymlink) {
+      // Attempt to create a symlink and return.
+      if (createSymLink(((HdfsSymlink) to).getSymlinkDirectory(), src, dest)) {
         return;
       }
-    } else {
-      slowCopy(to, src, dest, context);
     }
-  }
-
-  protected void slowCopy(Directory to, String src, String dest, IOContext context) throws IOException {
+    // if all else fails, just copy the file.
     super.copy(to, src, dest, context);
   }
 
-  private boolean quickMove(Directory to, String src, String dest, IOContext context) throws IOException {
-    LOG.warn("DANGEROUS copy [{0}] [{1}] [{2}] [{3}] [{4}]", to, src, dest, context, _path);
-    HdfsDirectory simpleTo = (HdfsDirectory) to;
-    if (ifSameCluster(simpleTo, this)) {
-      Path newDest = simpleTo.getPath(dest);
-      Path oldSrc = getPath(src);
-      if (_useCache) {
-        simpleTo._fileMap.put(dest, _fileMap.get(src));
-        _fileMap.remove(src);
-        FSDataInputStream inputStream = _inputMap.get(src);
-        if (inputStream != null) {
-          inputStream.close();
-        }
-      }
-      return _fileSystem.rename(oldSrc, newDest);
+  private boolean createSymLink(HdfsDirectory to, String src, String dest) throws IOException {
+    Path srcPath = getPath(src);
+    Path destDir = to.getPath();
+    LOG.info("Creating symlink with name [{0}] to [{1}]", dest, srcPath);
+    FSDataOutputStream outputStream = _fileSystem.create(getSymPath(destDir, dest));
+    outputStream.write(srcPath.toString().getBytes(UTF_8));
+    outputStream.close();
+    if (_useCache) {
+      to._fileLengthMap.put(dest, _fileLengthMap.get(src));
     }
-    return false;
+    return true;
   }
 
-  private boolean ifSameCluster(HdfsDirectory dest, HdfsDirectory src) {
-    // @TODO
-    return true;
+  private Path getSymPath(Path destDir, String destFilename) {
+    return new Path(destDir, destFilename + LNK);
   }
 
   @Override
-  public HdfsDirectory getQuickMoveDirectory() {
+  public HdfsDirectory getSymlinkDirectory() {
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsQuickMove.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsQuickMove.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsQuickMove.java
deleted file mode 100644
index 9504ca7..0000000
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsQuickMove.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.store.hdfs;
-
-public interface HdfsQuickMove {
-  
-  HdfsDirectory getQuickMoveDirectory();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsSymlink.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsSymlink.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsSymlink.java
new file mode 100644
index 0000000..82aac84
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsSymlink.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+public interface HdfsSymlink {
+  
+  HdfsDirectory getSymlinkDirectory();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-store/src/main/java/org/apache/blur/store/hdfs/SoftlinkHdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/SoftlinkHdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/SoftlinkHdfsDirectory.java
deleted file mode 100644
index deee52f..0000000
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/SoftlinkHdfsDirectory.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.store.hdfs;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.UUID;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IOContext;
-
-public class SoftlinkHdfsDirectory extends HdfsDirectory {
-
-  private static final Log LOG = LogFactory.getLog(HdfsDirectory.class);
-
-  private static final String UTF_8 = "UTF-8";
-  private static final String EXT = ".blur_lnk";
-  private final Path _storePath;
-  private final Path _linkPath;
-
-  /**
-   * Creates a new SoftlinkHdfsDirectory.
-   * 
-   * @param configuration
-   *          the {@link Configuration} object.
-   * @param storePath
-   *          the path where the data is actually stored.
-   * @param linkPath
-   *          the path where the links are stored.
-   * @throws IOException
-   */
-  public SoftlinkHdfsDirectory(Configuration configuration, Path storePath, Path linkPath) throws IOException {
-    super(configuration, linkPath);
-    FileSystem fileSystem = storePath.getFileSystem(configuration);
-    _storePath = fileSystem.makeQualified(storePath);
-    _linkPath = fileSystem.makeQualified(linkPath);
-  }
-
-  @Override
-  protected FSDataOutputStream openForOutput(String name) throws IOException {
-    createLinkForNewFile(name);
-    return _fileSystem.create(getDataPath(name));
-  }
-
-  private void createLinkForNewFile(String name) throws IOException {
-    String uuid = UUID.randomUUID().toString();
-    Path dataPath = new Path(_storePath, uuid);
-    createLinkForNewFile(name, dataPath.toUri());
-  }
-
-  private void createLinkForNewFile(String name, URI uri) throws IOException, UnsupportedEncodingException {
-    String uriStr = uri.toString();
-    Path linkPath = createLinkPath(name);
-    FSDataOutputStream outputStream = _fileSystem.create(linkPath, false);
-    outputStream.write(uriStr.getBytes(UTF_8));
-    outputStream.close();
-  }
-
-  private Path getDataPath(String name) throws IOException {
-    Path linkPath = createLinkPath(name);
-    boolean exists = _fileSystem.exists(linkPath);
-    if (exists) {
-      return new Path(getUri(linkPath));
-    } else {
-      return new Path(_linkPath, name);
-    }
-  }
-
-  private URI getUri(Path linkPath) throws IOException {
-    FileStatus fileStatus = _fileSystem.getFileStatus(linkPath);
-    byte[] buf = new byte[(int) fileStatus.getLen()];
-    FSDataInputStream inputStream = _fileSystem.open(linkPath);
-    inputStream.readFully(buf);
-    inputStream.close();
-    try {
-      return new URI(new String(buf, UTF_8));
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
-  }
-
-  private Path getLinkPath(String name) throws IOException {
-    Path linkPath = createLinkPath(name);
-    boolean exists = _fileSystem.exists(linkPath);
-    if (exists) {
-      return new Path(_linkPath, name + EXT);
-    } else {
-      return new Path(_linkPath, name);
-    }
-  }
-
-  private Path createLinkPath(String name) {
-    return new Path(_linkPath, name + EXT);
-  }
-
-  private String removeLinkExtensionSuffix(String name) {
-    if (name.endsWith(EXT)) {
-      return name.substring(0, name.length() - EXT.length());
-    }
-    return name;
-  }
-
-  @Override
-  public String[] listAll() throws IOException {
-    LOG.debug("listAll [{0}]", _path);
-    FileStatus[] files = _fileSystem.listStatus(_path, new PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        try {
-          return _fileSystem.isFile(path);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    });
-    String[] result = new String[files.length];
-    for (int i = 0; i < result.length; i++) {
-      result[i] = removeLinkExtensionSuffix(files[i].getPath().getName());
-    }
-    return result;
-  }
-
-  @Override
-  protected FSDataInputStream openForInput(String name) throws IOException {
-    return _fileSystem.open(getDataPath(name));
-  }
-
-  @Override
-  protected boolean exists(String name) throws IOException {
-    return _fileSystem.exists(getLinkPath(name));
-  }
-
-  @Override
-  protected void delete(String name) throws IOException {
-    _fileSystem.delete(getLinkPath(name), true);
-  }
-
-  @Override
-  protected long length(String name) throws IOException {
-    return _fileSystem.getFileStatus(getDataPath(name)).getLen();
-  }
-
-  @Override
-  protected long fileModified(String name) throws IOException {
-    return _fileSystem.getFileStatus(getDataPath(name)).getModificationTime();
-  }
-
-  @Override
-  public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
-    if (to instanceof DirectoryDecorator) {
-      copy(((DirectoryDecorator) to).getOriginalDirectory(), src, dest, context);
-      return;
-    } else if (to instanceof SoftlinkHdfsDirectory) {
-      LOG.warn("This needs to be tested....");
-      SoftlinkHdfsDirectory softlinkHdfsDirectory = (SoftlinkHdfsDirectory) to;
-      if (canQuickCopy(softlinkHdfsDirectory, src)) {
-        Path linkPath = getLinkPath(src);
-        softlinkHdfsDirectory.quickCopy(linkPath, dest);
-      }
-    }
-    slowCopy(to, src, dest, context);
-  }
-
-  private void quickCopy(Path linkPath, String dest) throws IOException {
-    URI uri = getUri(linkPath);
-    createLinkForNewFile(dest, uri);
-  }
-
-  private boolean canQuickCopy(SoftlinkHdfsDirectory to, String src) throws IOException {
-    if (to._storePath.equals(_storePath)) {
-      return _fileSystem.exists(createLinkPath(src));
-    }
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
index c87e4ef..21e3248 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
@@ -44,29 +44,11 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
   private static final String LENGTH = "/length";
   private static final BytesRef FILES = new BytesRef("FILES");
   private static final String SEP = "|";
-  
+
   private final Map<String, Long> _files = new ConcurrentHashMap<String, Long>();
   private final HdfsKeyValueStore _store;
   private final int _blockSize = 4096;
 
-  public static void main(String[] args) throws IOException {
-    Configuration configuration = new Configuration();
-    Path path = new Path("hdfs://localhost:9000/blur/fast/shard-00000000/fast");
-    FastHdfsKeyValueDirectory dir = new FastHdfsKeyValueDirectory(configuration, path);
-    HdfsKeyValueStore store = dir._store;
-    store.cleanupOldFiles();
-    String[] listAll = dir.listAll();
-    long total = 0;
-    for (String s : listAll) {
-      long fileLength = dir.fileLength(s);
-      System.out.println(s + " " + fileLength);
-      total += fileLength;
-    }
-    System.out.println("Total [" + total + "]");
-    dir.close();
-
-  }
-
   public FastHdfsKeyValueDirectory(Configuration configuration, Path path) throws IOException {
     _store = new HdfsKeyValueStore(configuration, path);
     BytesRef value = new BytesRef();
@@ -178,5 +160,4 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
     }
     throw new FileNotFoundException(name);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java
index 84f91a3..9e872a3 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java
@@ -25,14 +25,14 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.blur.store.blockcache.LastModified;
 import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.store.hdfs.HdfsQuickMove;
+import org.apache.blur.store.hdfs.HdfsSymlink;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 
-public class JoinDirectory extends Directory implements LastModified, HdfsQuickMove {
+public class JoinDirectory extends Directory implements LastModified, HdfsSymlink {
 
   private final HdfsDirectory _longTermStorage;
   private final Directory _shortTermStorage;
@@ -159,7 +159,7 @@ public class JoinDirectory extends Directory implements LastModified, HdfsQuickM
   }
 
   @Override
-  public HdfsDirectory getQuickMoveDirectory() {
+  public HdfsDirectory getSymlinkDirectory() {
     return _longTermStorage;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-store/src/test/java/org/apache/blur/store/SoftlinkHdfsDirectoryTestSuite.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/SoftlinkHdfsDirectoryTestSuite.java b/blur-store/src/test/java/org/apache/blur/store/SoftlinkHdfsDirectoryTestSuite.java
deleted file mode 100644
index 28a49d1..0000000
--- a/blur-store/src/test/java/org/apache/blur/store/SoftlinkHdfsDirectoryTestSuite.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.blur.store;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.blur.store.hdfs.SoftlinkHdfsDirectory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.Directory;
-import org.junit.Test;
-
-public class SoftlinkHdfsDirectoryTestSuite extends BaseDirectoryTestSuite {
-
-  @Override
-  protected Directory setupDirectory() throws IOException {
-    URI uri = new File(file, "hdfs").toURI();
-    Path hdfsDirPath = new Path(uri.toString());
-    Path store = new Path(hdfsDirPath, "store");
-    Path link = new Path(hdfsDirPath, "link");
-    Configuration conf = new Configuration();
-    return new SoftlinkHdfsDirectory(conf, store, link);
-  }
-
-  @Test
-  public void runsTheTests() {}
-
-  @Override
-  protected void close() throws IOException {
-    
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectorySymlinkTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectorySymlinkTest.java b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectorySymlinkTest.java
new file mode 100644
index 0000000..f3e2080
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectorySymlinkTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.Version;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HdfsDirectorySymlinkTest {
+
+  private Path _base;
+  private Configuration _configuration;
+  private FileSystem _fileSystem;
+
+  @Before
+  public void setup() throws IOException {
+    _base = new Path("./target/tmp/HdfsDirectoryTest");
+    _configuration = new Configuration();
+
+    _fileSystem = _base.getFileSystem(_configuration);
+    _fileSystem.delete(_base, true);
+    _fileSystem.mkdirs(_base);
+  }
+
+  @Test
+  public void testSymlink() throws IOException {
+    HdfsDirectory dir1 = new HdfsDirectory(_configuration, new Path(_base, "dir1"));
+    IndexOutput output = dir1.createOutput("file1", IOContext.DEFAULT);
+    output.writeLong(12345);
+    output.close();
+
+    assertTrue(dir1.fileExists("file1"));
+
+    HdfsDirectory dir2 = new HdfsDirectory(_configuration, new Path(_base, "dir2"));
+    dir1.copy(dir2, "file1", "file2", IOContext.DEFAULT);
+
+    assertTrue(dir2.fileExists("file2"));
+    assertEquals(8, dir2.fileLength("file2"));
+
+    String[] listAll = dir2.listAll();
+    assertEquals(1, listAll.length);
+    assertEquals("file2", listAll[0]);
+
+    IndexInput input = dir2.openInput("file2", IOContext.DEFAULT);
+    assertEquals(12345, input.readLong());
+    input.close();
+
+    dir2.deleteFile("file2");
+
+    assertFalse(dir2.fileExists("file2"));
+    assertTrue(dir1.fileExists("file1"));
+
+    dir2.close();
+    dir1.close();
+  }
+
+  @Test
+  public void testSymlinkWithIndexes() throws IOException {
+    HdfsDirectory dir1 = new HdfsDirectory(_configuration, new Path(_base, "dir1"));
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
+    IndexWriter writer1 = new IndexWriter(dir1, conf.clone());
+    writer1.addDocument(getDoc());
+    writer1.close();
+
+    HdfsDirectory dir2 = new HdfsDirectory(_configuration, new Path(_base, "dir2"));
+    IndexWriter writer2 = new IndexWriter(dir2, conf.clone());
+    writer2.addIndexes(dir1);
+    writer2.close();
+
+    DirectoryReader reader1 = DirectoryReader.open(dir1);
+    DirectoryReader reader2 = DirectoryReader.open(dir2);
+
+    assertEquals(1, reader1.maxDoc());
+    assertEquals(1, reader2.maxDoc());
+    assertEquals(1, reader1.numDocs());
+    assertEquals(1, reader2.numDocs());
+
+    Document document1 = reader1.document(0);
+    Document document2 = reader2.document(0);
+
+    assertEquals(document1.get("id"), document2.get("id"));
+  }
+
+  private Document getDoc() {
+    Document document = new Document();
+    document.add(new StringField("id", UUID.randomUUID().toString(), Store.YES));
+    return document;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bfb49c73/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
index 3dc046d..750478d 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.blur.store.hdfs_v2;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;


Mime
View raw message