incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/5] git commit: Many updates to improve mutate performance.
Date Mon, 06 Jan 2014 20:42:32 GMT
Updated Branches:
  refs/heads/master 6236a1874 -> 0b0374732


Many updates to improve mutate performance.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/6d62722b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/6d62722b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/6d62722b

Branch: refs/heads/master
Commit: 6d62722bee43d7e1ff91ed4261d4f7ae53563685
Parents: 5a18fa9
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Jan 4 21:49:06 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Jan 5 09:32:06 2014 -0500

----------------------------------------------------------------------
 .../manager/writer/BlurIndexSimpleWriter.java   | 38 +++++-----
 .../blur/thrift/BlurControllerServer.java       | 28 ++++++--
 .../main/java/org/apache/blur/shell/Main.java   |  6 +-
 .../apache/blur/store/hdfs/BlurLockFactory.java |  8 +++
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 27 ++++---
 .../apache/blur/store/hdfs/HdfsIndexInput.java  | 60 ++++------------
 .../hdfs_v2/FastHdfsKeyValueDirectory.java      | 28 +++++++-
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 76 +++++++++++++++++---
 .../blur/store/hdfs_v2/JoinDirectory.java       | 16 ++---
 .../store/hdfs_v2/HdfsKeyValueStoreTest.java    | 22 ++++++
 .../org/apache/blur/thrift/util/LoadData.java   |  7 +-
 .../main/java/org/apache/blur/trace/Trace.java  | 16 +++--
 12 files changed, 219 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index 08b6694..5090c10 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -163,7 +163,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
       BlurIndexWriter writer = _writer.get();
       List<List<Field>> docs = TransactionRecorder.getDocs(row, _fieldManager);
       writer.updateDocuments(TransactionRecorder.createRowId(row.getId()), docs);
-      commit(true);
+      commit();
     } finally {
       trace.done();
       _readLock.unlock();
@@ -177,7 +177,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
       waitUntilNotNull(_writer);
       BlurIndexWriter writer = _writer.get();
       writer.deleteDocuments(TransactionRecorder.createRowId(rowId));
-      commit(true);
+      commit();
     } finally {
       _readLock.unlock();
     }
@@ -206,7 +206,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   }
 
   @Override
-  public synchronized void refresh() throws IOException {
+  public void refresh() throws IOException {
 
   }
 
@@ -256,25 +256,23 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     throw new RuntimeException("not impl");
   }
 
-  private synchronized void commit(boolean waitToBeVisible) throws IOException {
-    if (waitToBeVisible) {
-      Tracer trace1 = Trace.trace("commit");
-      waitUntilNotNull(_writer);
-      BlurIndexWriter writer = _writer.get();
-      writer.commit();
-      trace1.done();
+  private synchronized void commit() throws IOException {
+    Tracer trace1 = Trace.trace("commit");
+    waitUntilNotNull(_writer);
+    BlurIndexWriter writer = _writer.get();
+    writer.commit();
+    trace1.done();
 
-      Tracer trace2 = Trace.trace("commit");
-      DirectoryReader currentReader = _indexReader.get();
-      DirectoryReader newReader = DirectoryReader.openIfChanged(currentReader);
-      if (newReader == null) {
-        LOG.error("Reader should be new after commit for table [{0}] shard [{1}].", _tableContext.getTable(),
-            _shardContext.getShard());
-      }
-      _indexReader.set(wrap(newReader));
-      _indexCloser.close(currentReader);
-      trace2.done();
+    Tracer trace2 = Trace.trace("index refresh");
+    DirectoryReader currentReader = _indexReader.get();
+    DirectoryReader newReader = DirectoryReader.openIfChanged(currentReader);
+    if (newReader == null) {
+      LOG.error("Reader should be new after commit for table [{0}] shard [{1}].", _tableContext.getTable(),
+          _shardContext.getShard());
     }
+    _indexReader.set(wrap(newReader));
+    _indexCloser.close(currentReader);
+    trace2.done();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index c78b6a9..0cd4e18 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -118,13 +118,29 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     }
 
     @Override
-    public <T> T execute(String node, BlurCommand<T> command, int maxRetries,
long backOffTime, long maxBackOffTime)
-        throws BlurException, TException, IOException {
-      Tracer trace = Trace.trace("remote call - thrift", Trace.param("node", node));
+    public <T> T execute(final String node, final BlurCommand<T> command, final
int maxRetries, final long backOffTime,
+        final long maxBackOffTime) throws BlurException, TException, IOException {
+      Callable<T> callable = Trace.getCallable(new Callable<T>() {
+        @Override
+        public T call() throws Exception {
+          Tracer trace = Trace.trace("remote call - thrift", Trace.param("node", node));
+          try {
+            return BlurClientManager.execute(node + "#" + _timeout, command, maxRetries,
backOffTime, maxBackOffTime);
+          } finally {
+            trace.done();
+          }
+        }
+      });
       try {
-        return BlurClientManager.execute(node + "#" + _timeout, command, maxRetries, backOffTime,
maxBackOffTime);
-      } finally {
-        trace.done();
+        return callable.call();
+      } catch (BlurException e) {
+        throw e;
+      } catch (TException e) {
+        throw e;
+      } catch (IOException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new BException("Unknown error during remote call to node [" + node + "]", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-shell/src/main/java/org/apache/blur/shell/Main.java
----------------------------------------------------------------------
diff --git a/blur-shell/src/main/java/org/apache/blur/shell/Main.java b/blur-shell/src/main/java/org/apache/blur/shell/Main.java
index 100e62f..2f1613c 100644
--- a/blur-shell/src/main/java/org/apache/blur/shell/Main.java
+++ b/blur-shell/src/main/java/org/apache/blur/shell/Main.java
@@ -27,15 +27,16 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import jline.console.ConsoleReader;
 import jline.console.completer.Completer;
 
+import org.apache.blur.BlurConfiguration;
 import org.apache.blur.shell.Command.CommandException;
 import org.apache.blur.shell.Main.QuitCommand.QuitCommandException;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
@@ -49,6 +50,7 @@ import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.trace.LogTraceStorage;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.user.User;
 import org.apache.blur.user.UserContext;
@@ -518,6 +520,8 @@ public class Main {
   }
 
   public static void main(String[] args) throws Throwable {
+    
+    Trace.setStorage(new LogTraceStorage(new BlurConfiguration()));
 
     args = removeLeadingShellFromScript(args);
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
index c4f032c..4d9359a 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
@@ -18,6 +18,8 @@ package org.apache.blur.store.hdfs;
  */
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -41,6 +43,7 @@ public class BlurLockFactory extends LockFactory {
   private final String _baseLockKey;
   private byte[] _lockKey;
   private final Path _dir;
+  private long _checkTime = TimeUnit.SECONDS.toMillis(1);
 
   public BlurLockFactory(Configuration configuration, Path dir, String host, String pid)
throws IOException {
     _configuration = configuration;
@@ -52,6 +55,7 @@ public class BlurLockFactory extends LockFactory {
   @Override
   public Lock makeLock(String lockName) {
     final Path lockPath = new Path(_dir, lockName);
+    final AtomicLong _lastCheck = new AtomicLong();
 
     return new Lock() {
       private boolean _set;
@@ -89,6 +93,9 @@ public class BlurLockFactory extends LockFactory {
 
       @Override
       public boolean isLocked() throws IOException {
+        if (_lastCheck.get() + _checkTime >= System.currentTimeMillis()) {
+          return true;
+        }
         Tracer trace = Trace.trace("filesystem - isLocked", Trace.param("lockPath", lockPath));
         try {
           if (!_set) {
@@ -110,6 +117,7 @@ public class BlurLockFactory extends LockFactory {
           inputStream.readFully(buf);
           inputStream.close();
           if (Arrays.equals(_lockKey, buf)) {
+            _lastCheck.set(System.currentTimeMillis());
             return true;
           }
           LOG.info("The lock information has been changed.");

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 89d0f46..cb22b3b 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -30,7 +30,6 @@ import java.util.Set;
 import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -60,8 +59,6 @@ public class HdfsDirectory extends Directory implements LastModified {
 
   private static final Log LOG = LogFactory.getLog(HdfsDirectory.class);
 
-  public static AtomicInteger fetchImpl = new AtomicInteger(3);
-
   /**
    * We keep the metrics separate per filesystem.
    */
@@ -71,6 +68,7 @@ public class HdfsDirectory extends Directory implements LastModified {
   protected final FileSystem _fileSystem;
   protected final MetricsGroup _metricsGroup;
   protected final Map<String, Long> _fileMap = new ConcurrentHashMap<String, Long>();
+  protected final Map<Path, FSDataInputStream> _inputMap = new ConcurrentHashMap<Path,
FSDataInputStream>();
   protected final boolean _useCache = true;
 
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
@@ -145,6 +143,7 @@ public class HdfsDirectory extends Directory implements LastModified {
         super.close();
         _fileMap.put(name, outputStream.getPos());
         outputStream.close();
+        openForInput(name);
       }
 
       @Override
@@ -172,14 +171,20 @@ public class HdfsDirectory extends Directory implements LastModified
{
     }
     FSDataInputStream inputStream = openForInput(name);
     long fileLength = fileLength(name);
-    return new HdfsIndexInput(name, inputStream, fileLength, _metricsGroup, fetchImpl.get(),
getPath(name));
+    return new HdfsIndexInput(name, inputStream, fileLength, _metricsGroup, getPath(name));
   }
 
-  protected FSDataInputStream openForInput(String name) throws IOException {
+  protected synchronized FSDataInputStream openForInput(String name) throws IOException {
     Path path = getPath(name);
+    FSDataInputStream inputStream = _inputMap.get(path);
+    if (inputStream != null) {
+      return inputStream;
+    }
     Tracer trace = Trace.trace("filesystem - open", Trace.param("path", path));
     try {
-      return _fileSystem.open(path);
+      inputStream = _fileSystem.open(path);
+      _inputMap.put(path, inputStream);
+      return inputStream;
     } finally {
       trace.done();
     }
@@ -188,12 +193,12 @@ public class HdfsDirectory extends Directory implements LastModified
{
   @Override
   public String[] listAll() throws IOException {
     LOG.debug("listAll [{0}]", _path);
-    
+
     if (_useCache) {
       Set<String> names = new HashSet<String>(_fileMap.keySet());
       return names.toArray(new String[names.size()]);
     }
-    
+
     Tracer trace = Trace.trace("filesystem - list", Trace.param("path", _path));
     try {
       FileStatus[] files = _fileSystem.listStatus(_path, new PathFilter() {
@@ -250,7 +255,11 @@ public class HdfsDirectory extends Directory implements LastModified
{
 
   protected void delete(String name) throws IOException {
     Path path = getPath(name);
+    FSDataInputStream inputStream = _inputMap.remove(path);
     Tracer trace = Trace.trace("filesystem - delete", Trace.param("path", path));
+    if (inputStream != null) {
+      inputStream.close();
+    }
     try {
       _fileSystem.delete(path, true);
     } finally {
@@ -268,7 +277,7 @@ public class HdfsDirectory extends Directory implements LastModified {
       }
       return length;
     }
-    
+
     return length(name);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
index 2854320..0331afd 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
@@ -28,18 +28,15 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
 
   private final long _length;
   private FSDataInputStream _inputStream;
-  private boolean _isClone;
   private final MetricsGroup _metricsGroup;
-  private int _readVersion;
   private final Path _path;
 
-  public HdfsIndexInput(String name, FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup,
-      int readVersion, Path path) throws IOException {
+  public HdfsIndexInput(String name, FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup, Path path)
+      throws IOException {
     super("HdfsIndexInput(" + path.toString() + ")");
     _inputStream = inputStream;
     _length = length;
     _metricsGroup = metricsGroup;
-    _readVersion = readVersion;
     _path = path;
   }
 
@@ -60,39 +57,12 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
     try {
       long start = System.nanoTime();
       long filePointer = getFilePointer();
-      switch (_readVersion) {
-      case 0:
-        synchronized (_inputStream) {
-          _inputStream.seek(getFilePointer());
-          _inputStream.readFully(b, offset, length);
-        }
-        break;
-      case 1:
-        while (length > 0) {
-          int amount;
-          synchronized (_inputStream) {
-            _inputStream.seek(filePointer);
-            amount = _inputStream.read(b, offset, length);
-          }
-          length -= amount;
-          offset += amount;
-          filePointer += amount;
-        }
-        break;
-      case 2:
-        _inputStream.readFully(filePointer, b, offset, length);
-        break;
-      case 3:
-        while (length > 0) {
-          int amount;
-          amount = _inputStream.read(filePointer, b, offset, length);
-          length -= amount;
-          offset += amount;
-          filePointer += amount;
-        }
-        break;
-      default:
-        break;
+      while (length > 0) {
+        int amount;
+        amount = _inputStream.read(filePointer, b, offset, length);
+        length -= amount;
+        offset += amount;
+        filePointer += amount;
       }
       long end = System.nanoTime();
       _metricsGroup.readAccess.update((end - start) / 1000);
@@ -103,17 +73,13 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
   }
 
   @Override
-  protected void closeInternal() throws IOException {
-    if (!_isClone) {
-      _inputStream.close();
-    }
-  }
-
-  @Override
   public ReusedBufferedIndexInput clone() {
     HdfsIndexInput clone = (HdfsIndexInput) super.clone();
-    clone._isClone = true;
-    clone._readVersion = HdfsDirectory.fetchImpl.get();
     return clone;
   }
+
+  @Override
+  protected void closeInternal() throws IOException {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
index 5536d99..acde4f0 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
@@ -44,6 +44,24 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
   private final HdfsKeyValueStore _store;
   private final int _blockSize = 4096;
 
+  public static void main(String[] args) throws IOException {
+    Configuration configuration = new Configuration();
+    Path path = new Path("hdfs://localhost:9000/blur/fast/shard-00000000/fast");
+    FastHdfsKeyValueDirectory dir = new FastHdfsKeyValueDirectory(configuration, path);
+    HdfsKeyValueStore store = dir._store;
+    store.cleanupOldFiles();
+    String[] listAll = dir.listAll();
+    long total = 0;
+    for (String s : listAll) {
+      long fileLength = dir.fileLength(s);
+      System.out.println(s + " " + fileLength);
+      total += fileLength;
+    }
+    System.out.println("Total [" + total + "]");
+    dir.close();
+
+  }
+
   public FastHdfsKeyValueDirectory(Configuration configuration, Path path) throws IOException
{
     _store = new HdfsKeyValueStore(configuration, path);
     BytesRef value = new BytesRef();
@@ -117,7 +135,15 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
 
   @Override
   public void deleteFile(String name) throws IOException {
-    _files.remove(name);
+    Long length = _files.remove(name);
+    if (length != null) {
+      long blocks = length / _blockSize;
+      _store.delete(new BytesRef(name + LENGTH));
+      _store.delete(new BytesRef(name + LASTMOD));
+      for (long l = 0; l <= blocks; l++) {
+        _store.delete(new BytesRef(name + "/" + l));
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index dc886b7..51997c7 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -23,7 +23,9 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentNavigableMap;
@@ -34,6 +36,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -48,7 +52,8 @@ import org.apache.lucene.util.BytesRef;
 
 public class HdfsKeyValueStore implements Store {
 
-  // private final Log LOG = LogFactory.getLog(HdfsKeyValueStore.class);
+  private static final int DEFAULT_MAX = 64 * 1024 * 1024;
+  private final Log LOG = LogFactory.getLog(HdfsKeyValueStore.class);
 
   static enum OperationType {
     PUT, DELETE
@@ -101,8 +106,17 @@ public class HdfsKeyValueStore implements Store {
     }
   };
 
-  private final ConcurrentNavigableMap<BytesRef, BytesRef> _pointers = new ConcurrentSkipListMap<BytesRef,
BytesRef>(
-      COMP);
+  static class Value {
+    Value(BytesRef bytesRef, Path path) {
+      _bytesRef = bytesRef;
+      _path = path;
+    }
+
+    BytesRef _bytesRef;
+    Path _path;
+  }
+
+  private final ConcurrentNavigableMap<BytesRef, Value> _pointers = new ConcurrentSkipListMap<BytesRef,
Value>(COMP);
   private final Configuration _configuration;
   private final Path _path;
   private final ReentrantReadWriteLock _readWriteLock;
@@ -113,9 +127,15 @@ public class HdfsKeyValueStore implements Store {
   private final ReadLock _readLock;
   private FSDataOutputStream _output;
   private Path _outputPath;
+  private final long _maxAmountAllowedPerFile;
   private boolean _isClosed;
 
   public HdfsKeyValueStore(Configuration configuration, Path path) throws IOException {
+    this(configuration, path, DEFAULT_MAX);
+  }
+
+  public HdfsKeyValueStore(Configuration configuration, Path path, long maxAmountAllowedPerFile)
throws IOException {
+    _maxAmountAllowedPerFile = maxAmountAllowedPerFile;
     _configuration = configuration;
     _path = path;
     _configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
@@ -159,15 +179,48 @@ public class HdfsKeyValueStore implements Store {
     _writeLock.lock();
     try {
       Operation op = getPutOperation(OperationType.PUT, key, value);
-      write(op);
-      _pointers.put(BytesRef.deepCopyOf(key), BytesRef.deepCopyOf(value));
+      Path path = write(op);
+      _pointers.put(BytesRef.deepCopyOf(key), new Value(BytesRef.deepCopyOf(value), path));
+      if (!path.equals(_outputPath)) {
+        cleanupOldFiles();
+      }
     } finally {
       _writeLock.unlock();
     }
   }
 
-  private void write(Operation op) throws IOException {
+  private Path write(Operation op) throws IOException {
     op.write(_output);
+    Path p = _outputPath;
+    if (_output.getPos() >= _maxAmountAllowedPerFile) {
+      rollFile();
+    }
+    return p;
+  }
+
+  private void rollFile() throws IOException {
+    LOG.info("Rolling file [" + _outputPath + "]");
+    _output.close();
+    openWriter();
+  }
+
+  public void cleanupOldFiles() throws IOException {
+    FileStatus[] listStatus = _fileSystem.listStatus(_path);
+    Set<Path> existingFiles = new HashSet<Path>();
+    for (FileStatus fileStatus : listStatus) {
+      existingFiles.add(fileStatus.getPath());
+    }
+    Set<Entry<BytesRef, Value>> entrySet = _pointers.entrySet();
+    existingFiles.remove(_outputPath);
+    for (Entry<BytesRef, Value> e : entrySet) {
+      Path p = e.getValue()._path;
+      existingFiles.remove(p);
+    }
+
+    for (Path p : existingFiles) {
+      LOG.info("Removing file no longer referenced [{0}]", p);
+      _fileSystem.delete(p, false);
+    }
   }
 
   private Operation getPutOperation(OperationType put, BytesRef key, BytesRef value) {
@@ -190,11 +243,11 @@ public class HdfsKeyValueStore implements Store {
     ensureOpen();
     _readLock.lock();
     try {
-      BytesRef internalValue = _pointers.get(key);
+      Value internalValue = _pointers.get(key);
       if (internalValue == null) {
         return false;
       }
-      value.copyBytes(internalValue);
+      value.copyBytes(internalValue._bytesRef);
       return true;
     } finally {
       _readLock.unlock();
@@ -280,17 +333,18 @@ public class HdfsKeyValueStore implements Store {
           // End of sync point found
           return;
         }
-        loadIndex(operation);
+        loadIndex(path, operation);
       }
     } finally {
       inputStream.close();
     }
   }
 
-  private void loadIndex(Operation operation) {
+  private void loadIndex(Path path, Operation operation) {
     switch (operation.type) {
     case PUT:
-      _pointers.put(BytesRef.deepCopyOf(getKey(operation.key)), BytesRef.deepCopyOf(getKey(operation.value)));
+      _pointers.put(BytesRef.deepCopyOf(getKey(operation.key)), new Value(BytesRef.deepCopyOf(getKey(operation.value)),
+          path));
       return;
     case DELETE:
       _pointers.remove(getKey(operation.key));

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java
index 9f73785..236c7e5 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/JoinDirectory.java
@@ -24,9 +24,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.blur.store.blockcache.LastModified;
+import org.apache.blur.utils.BlurConstants;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IOContext.Context;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 
@@ -91,7 +91,7 @@ public class JoinDirectory extends Directory implements LastModified {
 
   @Override
   public IndexOutput createOutput(String name, IOContext context) throws IOException {
-    if (context.context == Context.MERGE) {
+    if (Thread.currentThread().getName().startsWith(BlurConstants.SHARED_MERGE_SCHEDULER))
{
       addLongTermSyncFile(name);
       return _longTermStorage.createOutput(name, context);
     }
@@ -124,12 +124,12 @@ public class JoinDirectory extends Directory implements LastModified
{
     }
 
     if (!shortNames.isEmpty()) {
-//      System.out.println("Sync short [" + shortNames + "]");
+      // System.out.println("Sync short [" + shortNames + "]");
       _shortTermStorage.sync(shortNames);
     }
 
     if (!longNames.isEmpty()) {
-//      System.out.println("Sync long [" + longNames + "]");
+      // System.out.println("Sync long [" + longNames + "]");
       _longTermStorage.sync(longNames);
     }
   }
@@ -151,10 +151,10 @@ public class JoinDirectory extends Directory implements LastModified
{
   @Override
   public long getFileModified(String name) throws IOException {
     return 0;
-//    if (_shortTermStorage.fileExists(name)) {
-//      return ((LastModified) _shortTermStorage).getFileModified(name);
-//    }
-//    return ((LastModified) _longTermStorage).getFileModified(name);
+    // if (_shortTermStorage.fileExists(name)) {
+    // return ((LastModified) _shortTermStorage).getFileModified(name);
+    // }
+    // return ((LastModified) _longTermStorage).getFileModified(name);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
index 6d2f26f..b8668ae 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
@@ -93,6 +93,28 @@ public class HdfsKeyValueStoreTest {
     store2.close();
   }
 
+  @Test
+  public void testFileRolling() throws IOException {
+    HdfsKeyValueStore store = new HdfsKeyValueStore(_configuration, _path, 1000);
+    FileSystem fileSystem = _path.getFileSystem(_configuration);
+    assertEquals(1, fileSystem.listStatus(_path).length);
+    store.put(new BytesRef("a"), new BytesRef(new byte[2000]));
+    assertEquals(2, fileSystem.listStatus(_path).length);
+    store.close();
+  }
+
+  @Test
+  public void testFileGC() throws IOException {
+    HdfsKeyValueStore store = new HdfsKeyValueStore(_configuration, _path, 1000);
+    FileSystem fileSystem = _path.getFileSystem(_configuration);
+    assertEquals(1, fileSystem.listStatus(_path).length);
+    store.put(new BytesRef("a"), new BytesRef(new byte[2000]));
+    assertEquals(2, fileSystem.listStatus(_path).length);
+    store.put(new BytesRef("a"), new BytesRef(new byte[2000]));
+    assertEquals(2, fileSystem.listStatus(_path).length);
+    store.close();
+  }
+
   private BytesRef toBytesRef(String s) {
     return new BytesRef(s);
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-thrift/src/main/java/org/apache/blur/thrift/util/LoadData.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/LoadData.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/LoadData.java
index cf5f62a..ddf6321 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/util/LoadData.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/LoadData.java
@@ -102,7 +102,9 @@ public class LoadData {
       }
       countRow++;
     }
-    client.mutateBatch(mutations);
+    if (!mutations.isEmpty()) {
+      client.mutateBatch(mutations);
+    }
     printPerformance(out, countRow, countRecord, start, s, numberRows);
   }
 
@@ -112,8 +114,7 @@ public class LoadData {
     double recordRate = countRecord / seconds;
     double rowRate = countRow / seconds;
     double avgRowRate = i / totalSeconds;
-    out.printf("Rows indexed [%d] at Avg Rows [%f/s] Rows [%f/s] Records [%f/s]%n", i, avgRowRate,
rowRate,
-        recordRate);
+    out.printf("Rows indexed [%d] at Avg Rows [%f/s] Rows [%f/s] Records [%f/s]%n", i, avgRowRate,
rowRate, recordRate);
     out.flush();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6d62722b/blur-util/src/main/java/org/apache/blur/trace/Trace.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/trace/Trace.java b/blur-util/src/main/java/org/apache/blur/trace/Trace.java
index 7841806..3cebe10 100644
--- a/blur-util/src/main/java/org/apache/blur/trace/Trace.java
+++ b/blur-util/src/main/java/org/apache/blur/trace/Trace.java
@@ -109,16 +109,18 @@ public class Trace {
     return new Parameter(name, value);
   }
 
-  private static void setupTraceOnNewThread(TraceCollector parentCollector, String requestId,
int traceScope) {
+  private static TraceCollector setupTraceOnNewThread(TraceCollector parentCollector, String
requestId, int traceScope) {
+    TraceCollector existing = _tracer.get();
     TraceCollector traceCollector = new TraceCollector(parentCollector, requestId);
     TracerImpl tracer = new TracerImpl(traceCollector, parentCollector.getNextId(), traceScope,
requestId);
     parentCollector.add(tracer);
     _tracer.set(traceCollector);
+    return existing;
   }
 
-  private static void tearDownTraceOnNewThread() {
+  private static void tearDownTraceOnNewThread(TraceCollector old) {
     TraceCollector collector = _tracer.get();
-    _tracer.set(null);
+    _tracer.set(old);
     if (collector != null) {
       collector.finished();
     }
@@ -164,13 +166,13 @@ public class Trace {
       return new Runnable() {
         @Override
         public void run() {
-          setupTraceOnNewThread(tc, Long.toString(requestId), traceScope);
+          TraceCollector existing = setupTraceOnNewThread(tc, Long.toString(requestId), traceScope);
           Tracer t = Trace.trace("executing runnable", Trace.param(REQUEST_ID, requestId));
           try {
             runnable.run();
           } finally {
             t.done();
-            tearDownTraceOnNewThread();
+            tearDownTraceOnNewThread(existing);
           }
         }
       };
@@ -192,13 +194,13 @@ public class Trace {
       return new Callable<V>() {
         @Override
         public V call() throws Exception {
-          setupTraceOnNewThread(tc, Long.toString(requestId), traceScope);
+          TraceCollector existing = setupTraceOnNewThread(tc, Long.toString(requestId), traceScope);
           Tracer t = Trace.trace("executing callable", Trace.param(REQUEST_ID, requestId));
           try {
             return callable.call();
           } finally {
             t.done();
-            tearDownTraceOnNewThread();
+            tearDownTraceOnNewThread(existing);
           }
         }
       };


Mime
View raw message