incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/5] git commit: Many optimizations to make NRT updates faster.
Date Mon, 06 Jan 2014 20:42:33 GMT
Many optimizations to make NRT updates faster.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/5a18fa97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/5a18fa97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/5a18fa97

Branch: refs/heads/master
Commit: 5a18fa97b6692bb79e1c216ef8cbae1c085ee104
Parents: 6236a18
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Jan 3 20:56:36 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Jan 5 09:32:06 2014 -0500

----------------------------------------------------------------------
 .../indexserver/DistributedIndexServer.java     |  19 +-
 .../manager/writer/BlurIndexSimpleWriter.java   | 126 ++-----
 .../org/apache/blur/server/TableContext.java    |   6 +-
 .../apache/blur/store/hdfs/HdfsDirectory.java   |  37 ++-
 .../hdfs_v2/FastHdfsKeyValueDirectory.java      | 150 +++++++++
 .../hdfs_v2/FastHdfsKeyValueIndexInput.java     | 131 ++++++++
 .../hdfs_v2/FastHdfsKeyValueIndexOutput.java    | 112 +++++++
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 330 +++++++++++++++++++
 .../blur/store/hdfs_v2/JoinDirectory.java       | 160 +++++++++
 .../org/apache/blur/store/hdfs_v2/Store.java    |  39 +++
 .../FastHdfsKeyValueDirectoryTestSuite.java     |  48 +++
 .../blur/store/JoinDirectoryTestSuite.java      |  51 +++
 .../store/hdfs_v2/HdfsKeyValueStoreTest.java    | 100 ++++++
 .../apache/blur/thrift/BlurClientManager.java   |   4 +-
 14 files changed, 1203 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 7d6cede..4ab6188 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -17,6 +17,7 @@ package org.apache.blur.manager.indexserver;
  * limitations under the License.
  */
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -55,6 +56,8 @@ import org.apache.blur.server.TableContext;
 import org.apache.blur.store.BlockCacheDirectoryFactory;
 import org.apache.blur.store.hdfs.BlurLockFactory;
 import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.store.hdfs_v2.FastHdfsKeyValueDirectory;
+import org.apache.blur.store.hdfs_v2.JoinDirectory;
 import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurUtil;
@@ -460,8 +463,20 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer {
 
     BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsDirPath, _nodeName, BlurUtil.getPid());
 
-    Directory directory = new HdfsDirectory(_configuration, hdfsDirPath);
-    directory.setLockFactory(lockFactory);
+    HdfsDirectory longTermStorage = new HdfsDirectory(_configuration, hdfsDirPath);
+    longTermStorage.setLockFactory(lockFactory);
+
+    Directory directory;
+    URI uri = hdfsDirPath.toUri();
+    String scheme = uri.getScheme();
+    if (scheme != null && scheme.equals("hdfs")) {
+      LOG.info("Using Fast HDFS directory implementation on shard [{0}] for table [{1}]", shard, table);
+      FastHdfsKeyValueDirectory shortTermStorage = new FastHdfsKeyValueDirectory(_configuration, new Path(hdfsDirPath,
+          "fast"));
+      directory = new JoinDirectory(longTermStorage, shortTermStorage);
+    } else {
+      directory = longTermStorage;
+    }
 
     ShardContext shardContext = ShardContext.create(tableContext, shard);
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index 9e18c35..08b6694 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -17,10 +17,7 @@
 package org.apache.blur.manager.writer;
 
 import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -44,6 +41,8 @@ import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.trace.Trace;
+import org.apache.blur.trace.Tracer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Field;
@@ -73,12 +72,6 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private IndexImporter _indexImporter;
   private final ReadWriteLock _lock = new ReentrantReadWriteLock();
   private final Lock _readLock = _lock.readLock();
-  private final Thread _refresherThread;
-  private final Thread _commitThread;
-  private final long _commitTime;
-  private final long _refreshTime;
-
-  private volatile boolean _dirty;
   private Thread _optimizeThread;
 
   public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler mergeScheduler,
@@ -89,8 +82,6 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     _shardContext = shardContext;
     _tableContext = _shardContext.getTableContext();
     _fieldManager = _tableContext.getFieldManager();
-    _commitTime = _tableContext.getBlurConfiguration().getLong(BLUR_SHARD_TIME_BETWEEN_COMMITS, 30000);
-    _refreshTime = _tableContext.getBlurConfiguration().getLong(BLUR_SHARD_TIME_BETWEEN_REFRESHS, 1000);
     Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
     _conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
     _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
@@ -114,72 +105,6 @@ public class BlurIndexSimpleWriter extends BlurIndex {
 
     _writerOpener = getWriterOpener(shardContext);
     _writerOpener.start();
-
-    _refresherThread = getRefresherThread();
-    _refresherThread.start();
-
-    _commitThread = getCommitThread();
-    _commitThread.start();
-  }
-
-  private Thread getCommitThread() {
-    String table = _tableContext.getTable();
-    String shard = _shardContext.getShard();
-    Thread thread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        long pause = _commitTime;
-        while (!_isClosed.get()) {
-          synchronized (this) {
-            try {
-              wait(pause);
-            } catch (InterruptedException e) {
-              return;
-            }
-          }
-          try {
-            _writer.get().commit();
-            pause = _commitTime;
-          } catch (IOException e) {
-            LOG.error("Unknown error during commit/refresh.", e);
-            pause = Math.min(pause * 10, TimeUnit.MINUTES.toMillis(1));
-          }
-        }
-      }
-    });
-    thread.setName("Commiter for table [" + table + "] shard [" + shard + "]");
-    thread.setDaemon(true);
-    return thread;
-  }
-
-  private Thread getRefresherThread() {
-    String table = _tableContext.getTable();
-    String shard = _shardContext.getShard();
-    Thread thread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        long pause = _refreshTime;
-        while (!_isClosed.get()) {
-          synchronized (this) {
-            try {
-              wait(pause);
-            } catch (InterruptedException e) {
-              return;
-            }
-          }
-          try {
-            waitToBeVisible(true);
-            pause = _refreshTime;
-          } catch (IOException e) {
-            LOG.error("Unknown error during refresh.", e);
-            pause = Math.min(pause * 10, TimeUnit.MINUTES.toMillis(1));
-          }
-        }
-      }
-    });
-    thread.setName("Refresh for table [" + table + "] shard [" + shard + "]");
-    thread.setDaemon(true);
-    return thread;
   }
 
   private DirectoryReader wrap(DirectoryReader reader) {
@@ -232,14 +157,15 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   @Override
   public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
     _readLock.lock();
+    Tracer trace = Trace.trace("replaceRow");
     try {
       waitUntilNotNull(_writer);
       BlurIndexWriter writer = _writer.get();
       List<List<Field>> docs = TransactionRecorder.getDocs(row, _fieldManager);
       writer.updateDocuments(TransactionRecorder.createRowId(row.getId()), docs);
-      _dirty = true;
-      waitToBeVisible(waitToBeVisible);
+      commit(true);
     } finally {
+      trace.done();
       _readLock.unlock();
     }
   }
@@ -251,8 +177,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
       waitUntilNotNull(_writer);
       BlurIndexWriter writer = _writer.get();
       writer.deleteDocuments(TransactionRecorder.createRowId(rowId));
-      _dirty = true;
-      waitToBeVisible(waitToBeVisible);
+      commit(true);
     } finally {
       _readLock.unlock();
     }
@@ -277,29 +202,12 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   @Override
   public void close() throws IOException {
     _isClosed.set(true);
-    IOUtils.cleanup(LOG, closeable(_commitThread), closeable(_refresherThread), _indexImporter, _writer.get(),
-        _indexReader.get());
-  }
-
-  private Closeable closeable(final Thread thread) {
-    return new Closeable() {
-      @Override
-      public void close() throws IOException {
-        thread.interrupt();
-      }
-    };
+    IOUtils.cleanup(LOG, _indexImporter, _writer.get(), _indexReader.get());
   }
 
   @Override
   public synchronized void refresh() throws IOException {
-    if (_dirty) {
-      DirectoryReader currentReader = _indexReader.get();
-      DirectoryReader newReader = DirectoryReader.open(_writer.get(), true);
-      LOG.debug("Refreshing index for table [{0}] shard [{1}].", _tableContext.getTable(), _shardContext.getShard());
-      _indexReader.set(wrap(newReader));
-      _indexCloser.close(currentReader);
-      _dirty = false;
-    }
+
   }
 
   @Override
@@ -348,10 +256,24 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     throw new RuntimeException("not impl");
   }
 
-  private void waitToBeVisible(boolean waitToBeVisible) throws IOException {
+  private synchronized void commit(boolean waitToBeVisible) throws IOException {
     if (waitToBeVisible) {
+      Tracer trace1 = Trace.trace("commit");
       waitUntilNotNull(_writer);
-      refresh();
+      BlurIndexWriter writer = _writer.get();
+      writer.commit();
+      trace1.done();
+
+      Tracer trace2 = Trace.trace("commit");
+      DirectoryReader currentReader = _indexReader.get();
+      DirectoryReader newReader = DirectoryReader.openIfChanged(currentReader);
+      if (newReader == null) {
+        LOG.error("Reader should be new after commit for table [{0}] shard [{1}].", _tableContext.getTable(),
+            _shardContext.getShard());
+      }
+      _indexReader.set(wrap(newReader));
+      _indexCloser.close(currentReader);
+      trace2.done();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
index aa363e3..4db9058 100644
--- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -50,8 +50,8 @@ import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.manager.writer.BlurIndexCloser;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
-//import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
-import org.apache.blur.manager.writer.BlurNRTIndex;
+import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
+//import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
 import org.apache.blur.thrift.generated.ScoreType;
 import org.apache.blur.thrift.generated.TableDescriptor;
@@ -331,7 +331,7 @@ public class TableContext {
       DirectoryReferenceFileGC gc, ExecutorService searchExecutor, BlurIndexCloser indexCloser,
       BlurIndexRefresher refresher, BlurIndexWarmup indexWarmup) throws IOException {
 
-    String className = _blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS, BlurNRTIndex.class.getName());
+    String className = _blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS, BlurIndexSimpleWriter.class.getName());
 
     Class<? extends BlurIndex> clazz;
     try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 7ea7fe4..89d0f46 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -24,8 +24,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -67,6 +70,8 @@ public class HdfsDirectory extends Directory implements LastModified {
   protected final Path _path;
   protected final FileSystem _fileSystem;
   protected final MetricsGroup _metricsGroup;
+  protected final Map<String, Long> _fileMap = new ConcurrentHashMap<String, Long>();
+  protected final boolean _useCache = true;
 
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
     this._path = path;
@@ -83,6 +88,14 @@ public class HdfsDirectory extends Directory implements LastModified {
       }
       _metricsGroup = metricsGroup;
     }
+    if (_useCache) {
+      FileStatus[] listStatus = _fileSystem.listStatus(_path);
+      for (FileStatus fileStatus : listStatus) {
+        if (!fileStatus.isDir()) {
+          _fileMap.put(fileStatus.getPath().getName(), fileStatus.getLen());
+        }
+      }
+    }
   }
 
   private MetricsGroup createNewMetricsGroup(String scope) {
@@ -104,11 +117,12 @@ public class HdfsDirectory extends Directory implements LastModified {
   }
 
   @Override
-  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+  public IndexOutput createOutput(final String name, IOContext context) throws IOException {
     LOG.debug("createOutput [{0}] [{1}] [{2}]", name, context, _path);
     if (fileExists(name)) {
       throw new IOException("File [" + name + "] already exists found.");
     }
+    _fileMap.put(name, 0L);
     final FSDataOutputStream outputStream = openForOutput(name);
     return new BufferedIndexOutput() {
 
@@ -129,6 +143,7 @@ public class HdfsDirectory extends Directory implements LastModified {
       @Override
       public void close() throws IOException {
         super.close();
+        _fileMap.put(name, outputStream.getPos());
         outputStream.close();
       }
 
@@ -173,6 +188,12 @@ public class HdfsDirectory extends Directory implements LastModified {
   @Override
   public String[] listAll() throws IOException {
     LOG.debug("listAll [{0}]", _path);
+    
+    if (_useCache) {
+      Set<String> names = new HashSet<String>(_fileMap.keySet());
+      return names.toArray(new String[names.size()]);
+    }
+    
     Tracer trace = Trace.trace("filesystem - list", Trace.param("path", _path));
     try {
       FileStatus[] files = _fileSystem.listStatus(_path, new PathFilter() {
@@ -198,6 +219,9 @@ public class HdfsDirectory extends Directory implements LastModified {
   @Override
   public boolean fileExists(String name) throws IOException {
     LOG.debug("fileExists [{0}] [{1}]", name, _path);
+    if (_useCache) {
+      return _fileMap.containsKey(name);
+    }
     return exists(name);
   }
 
@@ -215,6 +239,9 @@ public class HdfsDirectory extends Directory implements LastModified {
   public void deleteFile(String name) throws IOException {
     LOG.debug("deleteFile [{0}] [{1}]", name, _path);
     if (fileExists(name)) {
+      if (_useCache) {
+        _fileMap.remove(name);
+      }
       delete(name);
     } else {
       throw new FileNotFoundException("File [" + name + "] not found");
@@ -234,6 +261,14 @@ public class HdfsDirectory extends Directory implements LastModified {
   @Override
   public long fileLength(String name) throws IOException {
     LOG.debug("fileLength [{0}] [{1}]", name, _path);
+    if (_useCache) {
+      Long length = _fileMap.get(name);
+      if (length == null) {
+        throw new FileNotFoundException(name);
+      }
+      return length;
+    }
+    
     return length(name);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
new file mode 100644
index 0000000..5536d99
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs_v2;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.store.blockcache.LastModified;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.NoLockFactory;
+import org.apache.lucene.util.BytesRef;
+
+public class FastHdfsKeyValueDirectory extends Directory implements LastModified {
+
+  private static final String LASTMOD = "/lastmod";
+  private static final String LENGTH = "/length";
+  private static final BytesRef FILES = new BytesRef("FILES");
+  private static final String SEP = "|";
+  private final Map<String, Long> _files = new ConcurrentHashMap<String, Long>();
+  private final HdfsKeyValueStore _store;
+  private final int _blockSize = 4096;
+
+  public FastHdfsKeyValueDirectory(Configuration configuration, Path path) throws IOException {
+    _store = new HdfsKeyValueStore(configuration, path);
+    BytesRef value = new BytesRef();
+    if (_store.get(FILES, value)) {
+      String filesString = value.utf8ToString();
+      String[] files = filesString.split("\\" + SEP);
+      for (String file : files) {
+        if (file.isEmpty()) {
+          continue;
+        }
+        BytesRef key = new BytesRef(file + LENGTH);
+        if (_store.get(key, value)) {
+          _files.put(file, Long.parseLong(value.utf8ToString()));
+        } else {
+          throw new IOException("Missing meta data for [" + key.utf8ToString() + "].");
+        }
+      }
+    }
+    setLockFactory(NoLockFactory.getNoLockFactory());
+  }
+
+  public void writeBlock(String name, long blockId, byte[] b, int offset, int length) throws IOException {
+    _store.put(new BytesRef(name + "/" + blockId), new BytesRef(b, offset, length));
+  }
+
+  public void readBlock(String name, long blockId, BytesRef ref) throws IOException {
+    if (!_store.get(new BytesRef(name + "/" + blockId), ref)) {
+      throw new IOException("Block [" + name + "] [" + blockId + "] not found.");
+    }
+  }
+
+  public synchronized void writeLength(String name, long length) throws IOException {
+    _files.put(name, length);
+    _store.put(new BytesRef(name + LENGTH), new BytesRef(Long.toString(length)));
+    _store.put(new BytesRef(name + LASTMOD), new BytesRef(Long.toString(System.currentTimeMillis())));
+    writeFilesNames();
+  }
+
+  private void writeFilesNames() throws IOException {
+    StringBuilder builder = new StringBuilder();
+    for (String n : _files.keySet()) {
+      if (builder.length() != 0) {
+        builder.append(SEP);
+      }
+      builder.append(n);
+    }
+    _store.put(FILES, new BytesRef(builder.toString()));
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return new FastHdfsKeyValueIndexInput(name, fileLength(name), _blockSize, this);
+  }
+
+  @Override
+  public IndexOutput createOutput(final String name, IOContext context) throws IOException {
+    return new FastHdfsKeyValueIndexOutput(name, _blockSize, this);
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    Set<String> fileNames = new HashSet<String>(_files.keySet());
+    fileNames.remove(null);
+    return fileNames.toArray(new String[fileNames.size()]);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    return _files.containsKey(name);
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    _files.remove(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    if (fileExists(name)) {
+      return _files.get(name);
+    }
+    throw new FileNotFoundException(name);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    _store.sync();
+  }
+
+  @Override
+  public void close() throws IOException {
+    _store.close();
+  }
+
+  @Override
+  public long getFileModified(String name) throws IOException {
+    BytesRef value = new BytesRef();
+    if (_store.get(new BytesRef(name + LASTMOD), value)) {
+      return Long.parseLong(value.utf8ToString());
+    }
+    throw new FileNotFoundException(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexInput.java
new file mode 100644
index 0000000..f37cea4
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexInput.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs_v2;
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.BytesRef;
+
+public class FastHdfsKeyValueIndexInput extends IndexInput {
+
+  private final int _blockSize;
+  private final FastHdfsKeyValueDirectory _dir;
+  private final String _name;
+
+  private long _position;
+  private long _length;
+  private BytesRef _currentBlock;
+  private long _currentBlockId = -1l;
+  private boolean _closed;
+  private int _bufferPosition = -1;
+
+  public FastHdfsKeyValueIndexInput(String name, long length, int blockSize, FastHdfsKeyValueDirectory dir) {
+    super(name);
+    _name = name;
+    _length = length;
+    _blockSize = blockSize;
+    _dir = dir;
+    _currentBlock = new BytesRef(blockSize);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _closed = true;
+  }
+
+  @Override
+  public long getFilePointer() {
+    return _position;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    if (pos != _position) {
+      _position = pos;
+      _bufferPosition = -1;
+    }
+  }
+
+  @Override
+  public long length() {
+    return _length;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    ensureOpen();
+    checkEOF(1);
+    setupRead();
+    byte b = _currentBlock.bytes[_bufferPosition];
+    _bufferPosition++;
+    _position++;
+    return b;
+  }
+
+  private void setupRead() throws IOException {
+    long blockId = _position / _blockSize;
+    if (_currentBlockId != blockId) {
+      // read
+      _dir.readBlock(_name, blockId, _currentBlock);
+      _currentBlockId = blockId;
+      _bufferPosition = (int) (_position % _blockSize);
+    }
+    // incase the seek was in the same block, check to see if _bufferPosition is
+    // correct.
+    if (_bufferPosition == -1) {
+      _bufferPosition = (int) (_position % _blockSize);
+    }
+  }
+
+  private void ensureOpen() throws IOException {
+    if (_closed) {
+      throw new IOException("Already closed.");
+    }
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int length) throws IOException {
+    ensureOpen();
+    checkEOF(length);
+    while (length > 0) {
+      setupRead();
+      int len = Math.min(_blockSize - _bufferPosition, length);
+      System.arraycopy(_currentBlock.bytes, _bufferPosition, b, offset, len);
+      _position += len;
+      offset += len;
+      length -= len;
+      _bufferPosition += len;
+    }
+  }
+
+  private void checkEOF(int length) throws IOException {
+    if (_position + length > _length) {
+      throw new IOException("EOF reached.");
+    }
+  }
+
+  @Override
+  public IndexInput clone() {
+    FastHdfsKeyValueIndexInput clone = (FastHdfsKeyValueIndexInput) super.clone();
+    clone._currentBlock = new BytesRef();
+    clone._currentBlockId = -1;
+    clone._bufferPosition = -1;
+    return clone;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexOutput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexOutput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexOutput.java
new file mode 100644
index 0000000..56b3cfe
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueIndexOutput.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs_v2;
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexOutput;
+
+public class FastHdfsKeyValueIndexOutput extends IndexOutput {
+
+  private long _position;
+  private byte[] _buffer;
+  private int _bufferPosition;
+  private final int _blockSize;
+  private final FastHdfsKeyValueDirectory _dir;
+  private final String _name;
+  private boolean _closed;
+
+  public FastHdfsKeyValueIndexOutput(String name, int blockSize, FastHdfsKeyValueDirectory dir) {
+    _blockSize = blockSize;
+    _buffer = new byte[blockSize];
+    _name = name;
+    _dir = dir;
+  }
+
+  @Override
+  public void flush() throws IOException {
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    _closed=true;
+    long blockId;
+    if (_bufferPosition == _blockSize) {
+      blockId = (_position - 1) / _blockSize;
+    } else {
+      blockId = (_position) / _blockSize;
+    }
+    _dir.writeBlock(_name, blockId, _buffer, 0, _bufferPosition);
+    _dir.writeLength(_name, _position);
+  }
+
+  @Override
+  public long getFilePointer() {
+    return _position;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    throw new IOException("not supported");
+  }
+
+  @Override
+  public long length() throws IOException {
+    return _position;
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    ensureOpen();
+    tryToFlush();
+    _buffer[_bufferPosition] = b;
+    _bufferPosition++;
+    _position++;
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int length) throws IOException {
+    ensureOpen();
+    while (length > 0) {
+      tryToFlush();
+      int len = Math.min(_blockSize - _bufferPosition, length);
+      System.arraycopy(b, offset, _buffer, _bufferPosition, len);
+      _bufferPosition += len;
+      _position += len;
+      offset += len;
+      length -= len;
+    }
+  }
+
+  private void ensureOpen() throws IOException {
+    if (_closed) {
+      throw new IOException("Already closed.");
+    }
+  }
+
+  private void tryToFlush() throws IOException {
+    if (_bufferPosition > _blockSize) {
+      throw new IOException("Problem");
+    } else if (_bufferPosition == _blockSize) {
+      long blockId = (_position - 1) / _blockSize;
+      _dir.writeBlock(_name, blockId, _buffer, 0, _bufferPosition);
+      _bufferPosition = 0;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
new file mode 100644
index 0000000..dc886b7
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs_v2;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.lucene.util.BytesRef;
+
+public class HdfsKeyValueStore implements Store {
+
+  // private final Log LOG = LogFactory.getLog(HdfsKeyValueStore.class);
+
+  static enum OperationType {
+    PUT, DELETE
+  }
+
+  static class Operation implements Writable {
+
+    OperationType type;
+    BytesWritable key = new BytesWritable();
+    BytesWritable value = new BytesWritable();
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      if (type == OperationType.DELETE) {
+        out.write(0);
+        key.write(out);
+      } else if (type == OperationType.PUT) {
+        out.write(1);
+        key.write(out);
+        value.write(out);
+      } else {
+        throw new RuntimeException("Not supported [" + type + "]");
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      byte b = in.readByte();
+      switch (b) {
+      case 0:
+        type = OperationType.DELETE;
+        key.readFields(in);
+        return;
+      case 1:
+        type = OperationType.PUT;
+        key.readFields(in);
+        value.readFields(in);
+        return;
+      default:
+        throw new RuntimeException("Not supported [" + b + "]");
+      }
+    }
+
+  }
+
+  private static final Comparator<BytesRef> COMP = new Comparator<BytesRef>() {
+    @Override
+    public int compare(BytesRef b1, BytesRef b2) {
+      return WritableComparator.compareBytes(b1.bytes, b1.offset, b1.length, b2.bytes, b2.offset, b2.length);
+    }
+  };
+
+  private final ConcurrentNavigableMap<BytesRef, BytesRef> _pointers = new ConcurrentSkipListMap<BytesRef, BytesRef>(
+      COMP);
+  private final Configuration _configuration;
+  private final Path _path;
+  private final ReentrantReadWriteLock _readWriteLock;
+  private final AtomicReference<SortedSet<FileStatus>> _fileStatus = new AtomicReference<SortedSet<FileStatus>>();
+  private final FileSystem _fileSystem;
+  private final AtomicLong _currentFileCounter = new AtomicLong();
+  private final WriteLock _writeLock;
+  private final ReadLock _readLock;
+  private FSDataOutputStream _output;
+  private Path _outputPath;
+  private boolean _isClosed;
+
+  public HdfsKeyValueStore(Configuration configuration, Path path) throws IOException {
+    _configuration = configuration;
+    _path = path;
+    _configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
+    _fileSystem = FileSystem.get(_path.toUri(), _configuration);
+    _readWriteLock = new ReentrantReadWriteLock();
+    _writeLock = _readWriteLock.writeLock();
+    _readLock = _readWriteLock.readLock();
+    _fileStatus.set(getList(_path));
+    if (!_fileStatus.get().isEmpty()) {
+      _currentFileCounter.set(Long.parseLong(_fileStatus.get().last().getPath().getName()));
+    }
+    loadIndexes();
+    openWriter();
+  }
+
+  @Override
+  public void sync() throws IOException {
+    ensureOpen();
+    _writeLock.lock();
+    try {
+      syncInternal();
+    } finally {
+      _writeLock.unlock();
+    }
+  }
+
+  @Override
+  public Iterable<Entry<BytesRef, BytesRef>> scan(BytesRef key) throws IOException {
+    ensureOpen();
+    // to do
+    return null;
+  }
+
+  @Override
+  public void put(BytesRef key, BytesRef value) throws IOException {
+    ensureOpen();
+    if (value == null) {
+      delete(key);
+      return;
+    }
+    _writeLock.lock();
+    try {
+      Operation op = getPutOperation(OperationType.PUT, key, value);
+      write(op);
+      _pointers.put(BytesRef.deepCopyOf(key), BytesRef.deepCopyOf(value));
+    } finally {
+      _writeLock.unlock();
+    }
+  }
+
+  private void write(Operation op) throws IOException {
+    op.write(_output);
+  }
+
+  private Operation getPutOperation(OperationType put, BytesRef key, BytesRef value) {
+    Operation operation = new Operation();
+    operation.type = put;
+    operation.key.set(key.bytes, key.offset, key.length);
+    operation.value.set(value.bytes, value.offset, value.length);
+    return operation;
+  }
+
+  private Operation getDeleteOperation(OperationType delete, BytesRef key) {
+    Operation operation = new Operation();
+    operation.type = delete;
+    operation.key.set(key.bytes, key.offset, key.length);
+    return operation;
+  }
+
+  @Override
+  public boolean get(BytesRef key, BytesRef value) throws IOException {
+    ensureOpen();
+    _readLock.lock();
+    try {
+      BytesRef internalValue = _pointers.get(key);
+      if (internalValue == null) {
+        return false;
+      }
+      value.copyBytes(internalValue);
+      return true;
+    } finally {
+      _readLock.unlock();
+    }
+  }
+
+  @Override
+  public void delete(BytesRef key) throws IOException {
+    ensureOpen();
+    _writeLock.lock();
+    try {
+      Operation op = getDeleteOperation(OperationType.DELETE, key);
+      write(op);
+      _pointers.remove(key);
+    } finally {
+      _writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (_isClosed) {
+      _writeLock.lock();
+      try {
+        syncInternal();
+        _output.close();
+        _fileSystem.close();
+        _isClosed = true;
+      } finally {
+        _writeLock.unlock();
+      }
+    }
+  }
+
+  private void openWriter() throws IOException {
+    long nextSegment = _currentFileCounter.incrementAndGet();
+    String name = buffer(nextSegment);
+    _outputPath = new Path(_path, name);
+    _output = _fileSystem.create(_outputPath, false);
+  }
+
+  private String buffer(long number) {
+    String s = Long.toString(number);
+    StringBuilder builder = new StringBuilder();
+    for (int i = s.length(); i < 12; i++) {
+      builder.append('0');
+    }
+    return builder.append(s).toString();
+  }
+
+  private void loadIndexes() throws IOException {
+    for (FileStatus fileStatus : _fileStatus.get()) {
+      loadIndex(fileStatus.getPath());
+    }
+  }
+
+  private void ensureOpen() throws IOException {
+    if (_isClosed) {
+      throw new IOException("Already closed.");
+    }
+  }
+
+  private long getFileLength(Path path, FSDataInputStream inputStream) throws IOException {
+    FileStatus fileStatus = _fileSystem.getFileStatus(path);
+    DFSInputStream dfs = getDFS(inputStream);
+    return Math.max(dfs.getFileLength(), fileStatus.getLen());
+  }
+
+  private void syncInternal() throws IOException {
+    _output.flush();
+    _output.sync();
+  }
+
+  private void loadIndex(Path path) throws IOException {
+    FSDataInputStream inputStream = _fileSystem.open(path);
+    long fileLength = getFileLength(path, inputStream);
+    Operation operation = new Operation();
+    try {
+      while (inputStream.getPos() < fileLength) {
+        try {
+          operation.readFields(inputStream);
+        } catch (IOException e) {
+          // End of sync point found
+          return;
+        }
+        loadIndex(operation);
+      }
+    } finally {
+      inputStream.close();
+    }
+  }
+
+  private void loadIndex(Operation operation) {
+    switch (operation.type) {
+    case PUT:
+      _pointers.put(BytesRef.deepCopyOf(getKey(operation.key)), BytesRef.deepCopyOf(getKey(operation.value)));
+      return;
+    case DELETE:
+      _pointers.remove(getKey(operation.key));
+      return;
+    default:
+      throw new RuntimeException("Not supported [" + operation.type + "]");
+    }
+  }
+
+  private BytesRef getKey(BytesWritable key) {
+    return new BytesRef(key.getBytes(), 0, key.getLength());
+  }
+
+  private SortedSet<FileStatus> getList(Path p) throws IOException {
+    FileStatus[] listStatus = _fileSystem.listStatus(p);
+    if (listStatus == null) {
+      return new TreeSet<FileStatus>();
+    }
+    return new TreeSet<FileStatus>(Arrays.asList(listStatus));
+  }
+
+  private static DFSInputStream getDFS(FSDataInputStream inputStream) throws IOException {
+    try {
+      Field field = FilterInputStream.class.getDeclaredField("in");
+      field.setAccessible(true);
+      return (DFSInputStream) field.get(inputStream);
+    } catch (NoSuchFieldException e) {
+      throw new IOException(e);
+    } catch (SecurityException e) {
+      throw new IOException(e);
+    } catch (IllegalArgumentException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java
new file mode 100644
index 0000000..9f73785
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs_v2;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.store.blockcache.LastModified;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IOContext.Context;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+public class JoinDirectory extends Directory implements LastModified {
+
+  private final Directory _longTermStorage;
+  private final Directory _shortTermStorage;
+  private final Set<String> _shortTermSyncFiles = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+  private final Set<String> _longTermSyncFiles = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+  public JoinDirectory(Directory longTermStorage, Directory shortTermStorage) throws IOException {
+    lastModifiedCheck(longTermStorage);
+    lastModifiedCheck(shortTermStorage);
+    _longTermStorage = longTermStorage;
+    _shortTermStorage = shortTermStorage;
+    setLockFactory(_longTermStorage.getLockFactory());
+  }
+
+  private void lastModifiedCheck(Directory directory) {
+    if (!(directory instanceof LastModified)) {
+      throw new RuntimeException("Directory [" + directory + "] does not implement '" + LastModified.class.toString()
+          + "'");
+    }
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    Set<String> names = new HashSet<String>();
+    for (String s : _longTermStorage.listAll()) {
+      names.add(s);
+    }
+    for (String s : _shortTermStorage.listAll()) {
+      names.add(s);
+    }
+    return names.toArray(new String[names.size()]);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    if (_shortTermStorage.fileExists(name)) {
+      return true;
+    }
+    return _longTermStorage.fileExists(name);
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    if (_shortTermStorage.fileExists(name)) {
+      _shortTermStorage.deleteFile(name);
+      return;
+    }
+    _longTermStorage.deleteFile(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    if (_shortTermStorage.fileExists(name)) {
+      return _shortTermStorage.fileLength(name);
+    }
+    return _longTermStorage.fileLength(name);
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    if (context.context == Context.MERGE) {
+      addLongTermSyncFile(name);
+      return _longTermStorage.createOutput(name, context);
+    }
+    addShortTermSyncFile(name);
+    return _shortTermStorage.createOutput(name, context);
+  }
+
+  private void addShortTermSyncFile(String name) {
+    _shortTermSyncFiles.add(name);
+  }
+
+  private void addLongTermSyncFile(String name) {
+    _longTermSyncFiles.add(name);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    Set<String> shortNames = new HashSet<String>();
+    Set<String> longNames = new HashSet<String>();
+
+    for (String name : names) {
+      if (_shortTermSyncFiles.contains(name)) {
+        _shortTermSyncFiles.remove(name);
+        shortNames.add(name);
+      }
+      if (_longTermSyncFiles.contains(name)) {
+        _longTermSyncFiles.remove(name);
+        longNames.add(name);
+      }
+    }
+
+    if (!shortNames.isEmpty()) {
+//      System.out.println("Sync short [" + shortNames + "]");
+      _shortTermStorage.sync(shortNames);
+    }
+
+    if (!longNames.isEmpty()) {
+//      System.out.println("Sync long [" + longNames + "]");
+      _longTermStorage.sync(longNames);
+    }
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    if (_shortTermStorage.fileExists(name)) {
+      return _shortTermStorage.openInput(name, context);
+    }
+    return _longTermStorage.openInput(name, context);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _shortTermStorage.close();
+    _longTermStorage.close();
+  }
+
+  @Override
+  public long getFileModified(String name) throws IOException {
+    return 0;
+//    if (_shortTermStorage.fileExists(name)) {
+//      return ((LastModified) _shortTermStorage).getFileModified(name);
+//    }
+//    return ((LastModified) _longTermStorage).getFileModified(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/Store.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/Store.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/Store.java
new file mode 100644
index 0000000..189585a
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/Store.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs_v2;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.lucene.util.BytesRef;
+
+public interface Store extends Closeable {
+
+  void sync() throws IOException;
+
+  Iterable<Entry<BytesRef, BytesRef>> scan(BytesRef key) throws IOException;
+
+  void put(BytesRef key, BytesRef value) throws IOException;
+
+  boolean get(BytesRef key, BytesRef value) throws IOException;
+
+  void delete(BytesRef key) throws IOException;
+
+  void close() throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-store/src/test/java/org/apache/blur/store/FastHdfsKeyValueDirectoryTestSuite.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/FastHdfsKeyValueDirectoryTestSuite.java b/blur-store/src/test/java/org/apache/blur/store/FastHdfsKeyValueDirectoryTestSuite.java
new file mode 100644
index 0000000..d33104f
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/FastHdfsKeyValueDirectoryTestSuite.java
@@ -0,0 +1,48 @@
+package org.apache.blur.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.blur.store.hdfs_v2.FastHdfsKeyValueDirectory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.Directory;
+import org.junit.Test;
+
+public class FastHdfsKeyValueDirectoryTestSuite extends BaseDirectoryTestSuite {
+
+  @Override
+  protected Directory setupDirectory() throws IOException {
+    URI uri = new File(file, "hdfs").toURI();
+    Path hdfsDirPath = new Path(uri.toString());
+    Configuration conf = new Configuration();
+    return new FastHdfsKeyValueDirectory(conf, hdfsDirPath);
+  }
+
+  @Test
+  public void runsTheTests() {
+  }
+
+  @Override
+  protected void close() throws IOException {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-store/src/test/java/org/apache/blur/store/JoinDirectoryTestSuite.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/JoinDirectoryTestSuite.java b/blur-store/src/test/java/org/apache/blur/store/JoinDirectoryTestSuite.java
new file mode 100644
index 0000000..0e11092
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/JoinDirectoryTestSuite.java
@@ -0,0 +1,51 @@
+package org.apache.blur.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.store.hdfs_v2.JoinDirectory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.Directory;
+import org.junit.Test;
+
+public class JoinDirectoryTestSuite extends BaseDirectoryTestSuite {
+
+  @Override
+  protected Directory setupDirectory() throws IOException {
+    URI uri = new File(file, "hdfs-join").toURI();
+    Path hdfsDirPath = new Path(uri.toString());
+    Configuration conf = new Configuration();
+    HdfsDirectory longTerm = new HdfsDirectory(conf, new Path(hdfsDirPath, "long"));
+    HdfsDirectory shortTerm = new HdfsDirectory(conf, new Path(hdfsDirPath, "short"));
+    return new JoinDirectory(longTerm, shortTerm);
+  }
+
+  @Test
+  public void runsTheTests() {
+  }
+
+  @Override
+  protected void close() throws IOException {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
new file mode 100644
index 0000000..6d2f26f
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs_v2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.util.BytesRef;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HdfsKeyValueStoreTest {
+
+  private Path _path = new Path("hdfs://127.0.0.1:9000/test");
+  private Configuration _configuration = new Configuration();
+
+  @Before
+  public void setup() throws IOException {
+    FileSystem fileSystem = _path.getFileSystem(_configuration);
+    fileSystem.delete(_path, true);
+  }
+
+  @Test
+  public void testPutGet() throws IOException {
+    HdfsKeyValueStore store = new HdfsKeyValueStore(_configuration, _path);
+    store.put(toBytesRef("a"), toBytesRef("value1"));
+    store.put(toBytesRef("b"), toBytesRef("value2"));
+    store.sync();
+    BytesRef value = new BytesRef();
+    store.get(toBytesRef("a"), value);
+    assertEquals(new BytesRef("value1"), value);
+    store.get(toBytesRef("b"), value);
+    assertEquals(new BytesRef("value2"), value);
+    store.close();
+  }
+
+  @Test
+  public void testPutGetDelete() throws IOException {
+    HdfsKeyValueStore store = new HdfsKeyValueStore(_configuration, _path);
+    store.put(toBytesRef("a"), toBytesRef("value1"));
+    store.put(toBytesRef("b"), toBytesRef("value2"));
+    store.sync();
+    BytesRef value = new BytesRef();
+    store.get(toBytesRef("a"), value);
+    assertEquals(new BytesRef("value1"), value);
+    store.get(toBytesRef("b"), value);
+    assertEquals(new BytesRef("value2"), value);
+
+    store.delete(toBytesRef("b"));
+    store.sync();
+    assertFalse(store.get(toBytesRef("b"), value));
+    store.close();
+  }
+
+  @Test
+  public void testPutGetReopen() throws IOException {
+    HdfsKeyValueStore store1 = new HdfsKeyValueStore(_configuration, _path);
+    store1.put(toBytesRef("a"), toBytesRef("value1"));
+    store1.put(toBytesRef("b"), toBytesRef("value2"));
+    store1.sync();
+    BytesRef value1 = new BytesRef();
+    store1.get(toBytesRef("a"), value1);
+    assertEquals(new BytesRef("value1"), value1);
+    store1.get(toBytesRef("b"), value1);
+    assertEquals(new BytesRef("value2"), value1);
+    store1.close();
+
+    HdfsKeyValueStore store2 = new HdfsKeyValueStore(_configuration, _path);
+    BytesRef value2 = new BytesRef();
+    store2.get(toBytesRef("a"), value2);
+    assertEquals(new BytesRef("value1"), value2);
+    store2.get(toBytesRef("b"), value2);
+    assertEquals(new BytesRef("value2"), value2);
+    store2.close();
+  }
+
+  private BytesRef toBytesRef(String s) {
+    return new BytesRef(s);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5a18fa97/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
index bf5b170..9aea16f 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
@@ -182,13 +182,13 @@ public class BlurClientManager {
         }
         Tracer trace = null;
         try {
+          User user = UserConverter.toThriftUser(UserContext.getUser());
+          client.get().setUser(user);
           TraceId traceId = Trace.getTraceId();
           if (traceId != null) {
             client.get().startTrace(traceId.getRootId(), traceId.getRequestId());
             trace = Trace.trace("thrift client", Trace.param("connection", getConnectionStr(client.get())));
           }
-          User user = UserConverter.toThriftUser(UserContext.getUser());
-          client.get().setUser(user);
           T result = command.call((CLIENT) client.get(), connection);
           allBad = false;
           if (command.isDetachClient()) {


Mime
View raw message