incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/3] git commit: Changing timers to no longer be static.
Date Wed, 29 Oct 2014 01:26:26 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 2f3c81030 -> ce2179ed5


Changing timers to no longer be static.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/6a18af81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/6a18af81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/6a18af81

Branch: refs/heads/master
Commit: 6a18af81531bfc33e8e929b2294884f77c477c7f
Parents: 2f3c810
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Oct 28 21:22:03 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Oct 28 21:22:03 2014 -0400

----------------------------------------------------------------------
 .../indexserver/DistributedIndexServer.java     | 14 ++++--
 .../manager/indexserver/LocalIndexServer.java   | 13 +++++-
 .../apache/blur/manager/writer/BlurIndex.java   |  3 +-
 .../blur/manager/writer/BlurIndexReadOnly.java  |  2 +-
 .../manager/writer/BlurIndexSimpleWriter.java   | 10 +++--
 .../blur/manager/writer/IndexImporter.java      | 15 ++-----
 .../org/apache/blur/server/TableContext.java    |  8 ++--
 .../blur/thrift/ThriftBlurShardServer.java      | 23 ++++++++--
 .../blur/command/ShardCommandManagerTest.java   |  2 +-
 .../writer/BlurIndexSimpleWriterTest.java       |  7 ++-
 .../blur/manager/writer/IndexImporterTest.java  | 10 ++++-
 .../blur/store/MessingWithPermissions.java      | 47 ++++++++++++++++++++
 .../hdfs_v2/FastHdfsKeyValueDirectory.java      |  5 ++-
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 20 +++------
 .../FastHdfsKeyValueDirectoryTestSuite.java     |  9 +++-
 .../hdfs_v2/FastHdfsKeyValueDirectoryTest.java  | 10 ++++-
 .../store/hdfs_v2/HdfsKeyValueStoreTest.java    | 30 ++++++++-----
 17 files changed, 165 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 47b8aed..6b2fe8a 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
@@ -112,13 +113,18 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
   private final Closer _closer;
   private long _shortDelay = 250;
   private final int _minimumNumberOfNodes;
+  private final Timer _hdfsKeyValueTimer;
+  private final Timer _indexImporterTimer;
 
   public DistributedIndexServer(Configuration configuration, ZooKeeper zookeeper, ClusterStatus
clusterStatus,
       BlurFilterCache filterCache, BlockCacheDirectoryFactory blockCacheDirectoryFactory,
       DistributedLayoutFactory distributedLayoutFactory, String cluster, String nodeName,
long safeModeDelay,
       int shardOpenerThreadCount, int maxMergeThreads, int internalSearchThreads,
-      int minimumNumberOfNodesBeforeExitingSafeMode) throws KeeperException, InterruptedException
{
+      int minimumNumberOfNodesBeforeExitingSafeMode, Timer hdfsKeyValueTimer, Timer indexImporterTimer)
+      throws KeeperException, InterruptedException {
     super(clusterStatus, configuration, nodeName, cluster);
+    _indexImporterTimer = indexImporterTimer;
+    _hdfsKeyValueTimer = hdfsKeyValueTimer;
     _minimumNumberOfNodes = minimumNumberOfNodesBeforeExitingSafeMode;
     _running.set(true);
     _closer = Closer.create();
@@ -496,8 +502,8 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
     String scheme = uri.getScheme();
     if (scheme != null && scheme.equals("hdfs")) {
       LOG.info("Using Fast HDFS directory implementation on shard [{0}] for table [{1}]",
shard, table);
-      FastHdfsKeyValueDirectory shortTermStorage = new FastHdfsKeyValueDirectory(_configuration,
new Path(hdfsDirPath,
-          "fast"));
+      FastHdfsKeyValueDirectory shortTermStorage = new FastHdfsKeyValueDirectory(_hdfsKeyValueTimer,
_configuration,
+          new Path(hdfsDirPath, "fast"));
       directory = new JoinDirectory(longTermStorage, shortTermStorage);
     } else {
       directory = longTermStorage;
@@ -513,7 +519,7 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
     }
 
     BlurIndex index = tableContext.newInstanceBlurIndex(shardContext, directory, _mergeScheduler,
_searchExecutor,
-        _indexCloser);
+        _indexCloser, _indexImporterTimer);
 
     if (_clusterStatus.isReadOnly(true, _cluster, table)) {
       index = new BlurIndexReadOnly(index);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
index 1fd82b6..3cad08b 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
@@ -18,6 +18,7 @@ package org.apache.blur.manager.indexserver;
  */
 import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -27,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.Timer;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -67,12 +69,14 @@ public class LocalIndexServer extends AbstractIndexServer {
   private final Closer _closer;
   private final boolean _ramDir;
   private final BlurIndexCloser _indexCloser;
+  private final Timer _timer;
 
   public LocalIndexServer(TableDescriptor tableDescriptor) throws IOException {
     this(tableDescriptor, false);
   }
 
   public LocalIndexServer(TableDescriptor tableDescriptor, boolean ramDir) throws IOException
{
+    _timer = new Timer("Index Importer", true);
     _closer = Closer.create();
     _tableContext = TableContext.create(tableDescriptor);
     _mergeScheduler = _closer.register(new SharedMergeScheduler(3));
@@ -80,6 +84,13 @@ public class LocalIndexServer extends AbstractIndexServer {
     _closer.register(new CloseableExecutorService(_searchExecutor));
     _ramDir = ramDir;
     _indexCloser = _closer.register(new BlurIndexCloser());
+    _closer.register(new Closeable() {
+      @Override
+      public void close() throws IOException {
+        _timer.cancel();
+        _timer.purge();
+      }
+    });
     getIndexes(_tableContext.getTable());
   }
 
@@ -157,7 +168,7 @@ public class LocalIndexServer extends AbstractIndexServer {
   private BlurIndex openIndex(String table, String shard, Directory dir) throws CorruptIndexException,
IOException {
     ShardContext shardContext = ShardContext.create(_tableContext, shard);
     BlurIndexSimpleWriter index = new BlurIndexSimpleWriter(shardContext, dir, _mergeScheduler,
_searchExecutor,
-        _indexCloser);
+        _indexCloser, _timer);
     return index;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
index 99dbd6c..1ca8f0c 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -18,6 +18,7 @@ package org.apache.blur.manager.writer;
  */
 import java.io.IOException;
 import java.util.List;
+import java.util.Timer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,7 +42,7 @@ public abstract class BlurIndex {
   protected ShardContext _shardContext;
 
   public BlurIndex(ShardContext shardContext, Directory directory, SharedMergeScheduler mergeScheduler,
-      ExecutorService searchExecutor, BlurIndexCloser indexCloser) throws IOException {
+      ExecutorService searchExecutor, BlurIndexCloser indexCloser, Timer indexImporterTimer)
throws IOException {
     _shardContext = shardContext;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
index 12f194f..c2aee75 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
@@ -28,7 +28,7 @@ public class BlurIndexReadOnly extends BlurIndex {
   private final BlurIndex _blurIndex;
 
   public BlurIndexReadOnly(BlurIndex blurIndex) throws IOException {
-    super(null, null, null, null, null);
+    super(null, null, null, null, null, null);
     _blurIndex = blurIndex;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index bbc57d4..1880868 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -22,6 +22,7 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Timer;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -86,10 +87,12 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private final AtomicInteger _writesWaiting = new AtomicInteger();
   private final BlockingQueue<RowMutation> _queue;
   private final MutationQueueProcessor _mutationQueueProcessor;
+  private final Timer _indexImporterTimer;
 
   public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
-      final ExecutorService searchExecutor, BlurIndexCloser indexCloser) throws IOException
{
-    super(shardContext, directory, mergeScheduler, searchExecutor, indexCloser);
+      final ExecutorService searchExecutor, BlurIndexCloser indexCloser, Timer indexImporterTimer)
throws IOException {
+    super(shardContext, directory, mergeScheduler, searchExecutor, indexCloser, indexImporterTimer);
+    _indexImporterTimer = indexImporterTimer;
     _searchThreadPool = searchExecutor;
     _shardContext = shardContext;
     _tableContext = _shardContext.getTableContext();
@@ -156,7 +159,8 @@ public class BlurIndexSimpleWriter extends BlurIndex {
           synchronized (_writer) {
             _writer.notify();
           }
-          _indexImporter = new IndexImporter(BlurIndexSimpleWriter.this, _shardContext, TimeUnit.SECONDS,
10);
+          _indexImporter = new IndexImporter(_indexImporterTimer, BlurIndexSimpleWriter.this,
_shardContext,
+              TimeUnit.SECONDS, 10);
         } catch (IOException e) {
           LOG.error("Unknown error on index writer open.", e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
index 0d1d19e..366713c 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -65,16 +65,6 @@ public class IndexImporter extends TimerTask implements Closeable {
   private static final String INUSE = ".inuse";
   private static final String BADINDEX = ".badindex";
   private static final Lock _globalLock = new ReentrantReadWriteLock().writeLock();
-  private static final Timer _timer;
-
-  static {
-    _timer = new Timer("IndexImporter", true);
-  }
-
-  public static void closeIndexImporterTimer() {
-    _timer.cancel();
-    _timer.purge();
-  }
 
   private final static Log LOG = LogFactory.getLog(IndexImporter.class);
 
@@ -86,12 +76,13 @@ public class IndexImporter extends TimerTask implements Closeable {
 
   private long _lastCleanup;
 
-  public IndexImporter(BlurIndex blurIndex, ShardContext shardContext, TimeUnit refreshUnit,
long refreshAmount) {
+  public IndexImporter(Timer indexImporterTimer, BlurIndex blurIndex, ShardContext shardContext,
TimeUnit refreshUnit,
+      long refreshAmount) {
     _blurIndex = blurIndex;
     _shardContext = shardContext;
 
     long period = refreshUnit.toMillis(refreshAmount);
-    _timer.schedule(this, period, period);
+    indexImporterTimer.schedule(this, period, period);
     _table = _shardContext.getTableContext().getTable();
     _shard = _shardContext.getShard();
     _cleanupDelay = TimeUnit.MINUTES.toMillis(10);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
index 9e07f69..b56bf40 100644
--- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -30,6 +30,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Timer;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -321,7 +322,7 @@ public class TableContext implements Cloneable {
 
   @SuppressWarnings("unchecked")
   public BlurIndex newInstanceBlurIndex(ShardContext shardContext, Directory dir, SharedMergeScheduler
mergeScheduler,
-      ExecutorService searchExecutor, BlurIndexCloser indexCloser) throws IOException {
+      ExecutorService searchExecutor, BlurIndexCloser indexCloser, Timer indexImporterTimer)
throws IOException {
 
     String className = _blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS, BlurIndexSimpleWriter.class.getName());
 
@@ -333,7 +334,8 @@ public class TableContext implements Cloneable {
     }
     Constructor<? extends BlurIndex> constructor = findConstructor(clazz);
     try {
-      return constructor.newInstance(shardContext, dir, mergeScheduler, searchExecutor, indexCloser);
+      return constructor
+          .newInstance(shardContext, dir, mergeScheduler, searchExecutor, indexCloser, indexImporterTimer);
     } catch (InstantiationException e) {
       throw new IOException(e);
     } catch (IllegalAccessException e) {
@@ -348,7 +350,7 @@ public class TableContext implements Cloneable {
   private Constructor<? extends BlurIndex> findConstructor(Class<? extends BlurIndex>
clazz) throws IOException {
     try {
       return clazz.getConstructor(new Class[] { ShardContext.class, Directory.class, SharedMergeScheduler.class,
-          ExecutorService.class, BlurIndexCloser.class });
+          ExecutorService.class, BlurIndexCloser.class, Timer.class });
     } catch (NoSuchMethodException e) {
       throw new IOException(e);
     } catch (SecurityException e) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 7d0b0ef..61c11fc 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -55,10 +55,13 @@ import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT_DEFAULT;
 import static org.apache.blur.utils.BlurUtil.quietClose;
 
+import java.io.Closeable;
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Timer;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.BlurConfiguration;
@@ -204,9 +207,12 @@ public class ThriftBlurShardServer extends ThriftServer {
     int minimumNumberOfNodesBeforeExitingSafeMode = configuration.getInt(
         BLUR_SHARD_SERVER_MINIMUM_BEFORE_SAFEMODE_EXIT, 0);
     int internalSearchThreads = configuration.getInt(BLUR_SHARD_INTERNAL_SEARCH_THREAD_COUNT,
16);
+    final Timer hdfsKeyValueTimer = new Timer("HDFS KV Store", true);
+    final Timer indexImporterTimer = new Timer("IndexImporter", true);
     final DistributedIndexServer indexServer = new DistributedIndexServer(config, zooKeeper,
clusterStatus,
         filterCache, blockCacheDirectoryFactory, distributedLayoutFactory, cluster, nodeName,
safeModeDelay,
-        shardOpenerThreadCount, maxMergeThreads, internalSearchThreads, minimumNumberOfNodesBeforeExitingSafeMode);
+        shardOpenerThreadCount, maxMergeThreads, internalSearchThreads, minimumNumberOfNodesBeforeExitingSafeMode,
+        hdfsKeyValueTimer, indexImporterTimer);
 
     BooleanQuery.setMaxClauseCount(configuration.getInt(BLUR_MAX_CLAUSE_COUNT, 1024));
 
@@ -309,8 +315,9 @@ public class ThriftBlurShardServer extends ThriftServer {
       @Override
       public void shutdown() {
         ThreadWatcher threadWatcher = ThreadWatcher.instance();
-        quietClose(blockCacheDirectoryFactory, commandManager, traceStorage, refresher, server,
shardServer,
-            indexManager, indexServer, threadWatcher, clusterStatus, zooKeeper, httpServer);
+        quietClose(makeCloseable(hdfsKeyValueTimer), makeCloseable(indexImporterTimer), blockCacheDirectoryFactory,
+            commandManager, traceStorage, refresher, server, shardServer, indexManager, indexServer,
threadWatcher,
+            clusterStatus, zooKeeper, httpServer);
       }
     };
     server.setShutdown(shutdown);
@@ -318,6 +325,16 @@ public class ThriftBlurShardServer extends ThriftServer {
     return server;
   }
 
+  protected static Closeable makeCloseable(final Timer timer) {
+    return new Closeable() {
+      @Override
+      public void close() throws IOException {
+        timer.cancel();
+        timer.purge();
+      }
+    };
+  }
+
   @SuppressWarnings("unchecked")
   private static BlurFilterCache getFilterCache(BlurConfiguration configuration) {
     String blurFilterCacheClass = configuration.get(BLUR_SHARD_FILTER_CACHE_CLASS);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
index 5fec597..83dd232 100644
--- a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
@@ -322,7 +322,7 @@ public class ShardCommandManagerTest {
 
   protected BlurIndex getNullBlurIndex(String shard) throws IOException {
     ShardContext shardContext = ShardContext.create(getTableContextFactory().getTableContext("test"),
shard);
-    return new BlurIndex(shardContext, null, null, null, null) {
+    return new BlurIndex(shardContext, null, null, null, null, null) {
 
       @Override
       public void removeSnapshot(String name) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
index 5d54b75..c5efdb4 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.Timer;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 
@@ -76,9 +77,11 @@ public class BlurIndexSimpleWriterTest {
   private SharedMergeScheduler _mergeScheduler;
   private String uuid;
   private BlurIndexCloser _closer;
+  private Timer _timer;
 
   @Before
   public void setup() throws IOException {
+    _timer = new Timer("Index Importer", true);
     TableContext.clear();
     _base = new File(TMPDIR, "blur-index-writer-test");
     rmr(_base);
@@ -114,11 +117,13 @@ public class BlurIndexSimpleWriterTest {
     path.mkdirs();
     FSDirectory directory = FSDirectory.open(path);
     ShardContext shardContext = ShardContext.create(tableContext, "test-shard-" + uuid);
-    _writer = new BlurIndexSimpleWriter(shardContext, directory, _mergeScheduler, _service,
_closer);
+    _writer = new BlurIndexSimpleWriter(shardContext, directory, _mergeScheduler, _service,
_closer, _timer);
   }
 
   @After
   public void tearDown() throws IOException {
+    _timer.cancel();
+    _timer.purge();
     _writer.close();
     _mergeScheduler.close();
     _service.shutdownNow();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
index 756cc08..18349f9 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.List;
 import java.util.Random;
+import java.util.Timer;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,6 +72,7 @@ public class IndexImporterTest {
   private Path _inUsePath;
   private Path _shardPath;
   private HdfsDirectory _mainDirectory;
+  private Timer _timer;
 
   @Before
   public void setup() throws IOException {
@@ -81,6 +83,7 @@ public class IndexImporterTest {
     _fileSystem.delete(_base, true);
     _fileSystem.mkdirs(_base);
     setupWriter(_configuration);
+    _timer = new Timer("Index Importer", true);
   }
 
   private void setupWriter(Configuration configuration) throws IOException {
@@ -116,11 +119,12 @@ public class IndexImporterTest {
     _mainWriter = new IndexWriter(_mainDirectory, conf.clone());
     BufferStore.initNewBuffer(128, 128 * 128);
 
-    _indexImporter = new IndexImporter(getBlurIndex(shardContext, _mainDirectory), shardContext,
TimeUnit.MINUTES, 10);
+    _indexImporter = new IndexImporter(_timer, getBlurIndex(shardContext, _mainDirectory),
shardContext,
+        TimeUnit.MINUTES, 10);
   }
 
   private BlurIndex getBlurIndex(ShardContext shardContext, final Directory mainDirectory)
throws IOException {
-    return new BlurIndex(shardContext, mainDirectory, null, null, null) {
+    return new BlurIndex(shardContext, mainDirectory, null, null, null, null) {
 
       @Override
       public void removeSnapshot(String name) throws IOException {
@@ -198,6 +202,8 @@ public class IndexImporterTest {
 
   @After
   public void tearDown() throws IOException {
+    _timer.cancel();
+    _timer.purge();
     IOUtils.closeQuietly(_commitWriter);
     IOUtils.closeQuietly(_mainWriter);
     IOUtils.closeQuietly(_indexImporter);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-store/src/main/java/org/apache/blur/store/MessingWithPermissions.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/MessingWithPermissions.java b/blur-store/src/main/java/org/apache/blur/store/MessingWithPermissions.java
new file mode 100644
index 0000000..40eaef3
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/MessingWithPermissions.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class MessingWithPermissions {
+
+  public static void main(String[] args) throws IOException, InterruptedException {
+    UserGroupInformation blur = UserGroupInformation.createRemoteUser("blur");
+    final Path path = new Path("/permission_test/tables");
+    final Configuration configuration = new Configuration();
+    blur.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        FileSystem fileSystem = path.getFileSystem(configuration);
+        FileStatus[] listStatus = fileSystem.listStatus(path);
+        for (FileStatus status : listStatus) {
+          System.out.println(status.getPath());
+        }
+        return null;
+      }
+    });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
index c7cb326..0e0bb8c 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -57,9 +58,9 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
   private final Path _path;
   private long _lastGc;
 
-  public FastHdfsKeyValueDirectory(Configuration configuration, Path path) throws IOException
{
+  public FastHdfsKeyValueDirectory(Timer hdfsKeyValueTimer, Configuration configuration,
Path path) throws IOException {
     _path = path;
-    _store = new HdfsKeyValueStore(configuration, path);
+    _store = new HdfsKeyValueStore(hdfsKeyValueTimer, configuration, path);
     BytesRef value = new BytesRef();
     if (_store.get(FILES, value)) {
       String filesString = value.utf8ToString();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index 9a2837e..b48ea7a 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -77,7 +77,6 @@ public class HdfsKeyValueStore implements Store {
   private static final long MAX_OPEN_FOR_WRITING = TimeUnit.MINUTES.toMillis(1);
   private static final long DAEMON_POLL_TIME = TimeUnit.SECONDS.toMillis(5);
   private static final int VERSION_LENGTH = 4;
-  private static final Timer HDFS_KEY_VALUE_TIMER;
 
   static {
     try {
@@ -85,12 +84,6 @@ public class HdfsKeyValueStore implements Store {
     } catch (UnsupportedEncodingException e) {
       throw new RuntimeException(e);
     }
-    HDFS_KEY_VALUE_TIMER = new Timer("HDFS KV Store", true);
-  }
-  
-  public static void stopCleanupTimer() {
-    HDFS_KEY_VALUE_TIMER.cancel();
-    HDFS_KEY_VALUE_TIMER.purge();
   }
 
   static enum OperationType {
@@ -170,11 +163,12 @@ public class HdfsKeyValueStore implements Store {
   private final long _maxAmountAllowedPerFile;
   private boolean _isClosed;
 
-  public HdfsKeyValueStore(Configuration configuration, Path path) throws IOException {
-    this(configuration, path, DEFAULT_MAX);
+  public HdfsKeyValueStore(Timer hdfsKeyValueTimer, Configuration configuration, Path path)
throws IOException {
+    this(hdfsKeyValueTimer, configuration, path, DEFAULT_MAX);
   }
 
-  public HdfsKeyValueStore(Configuration configuration, Path path, long maxAmountAllowedPerFile)
throws IOException {
+  public HdfsKeyValueStore(Timer hdfsKeyValueTimer, Configuration configuration, Path path,
long maxAmountAllowedPerFile)
+      throws IOException {
     _maxAmountAllowedPerFile = maxAmountAllowedPerFile;
     _path = path;
     _fileSystem = _path.getFileSystem(configuration);
@@ -188,7 +182,7 @@ public class HdfsKeyValueStore implements Store {
     }
     removeAnyTruncatedFiles();
     loadIndexes();
-    startDaemon();
+    addToTimer(hdfsKeyValueTimer);
     Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, HDFS_KV, SIZE, path.getParent().toString()),
new Gauge<Long>() {
       @Override
       public Long value() {
@@ -212,7 +206,7 @@ public class HdfsKeyValueStore implements Store {
     }
   }
 
-  private void startDaemon() {
+  private void addToTimer(Timer hdfsKeyValueTimer) {
     _writeLock.lock();
     try {
       try {
@@ -223,7 +217,7 @@ public class HdfsKeyValueStore implements Store {
     } finally {
       _writeLock.unlock();
     }
-    HDFS_KEY_VALUE_TIMER.schedule(new TimerTask() {
+    hdfsKeyValueTimer.schedule(new TimerTask() {
       @Override
       public void run() {
         _writeLock.lock();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-store/src/test/java/org/apache/blur/store/FastHdfsKeyValueDirectoryTestSuite.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/FastHdfsKeyValueDirectoryTestSuite.java
b/blur-store/src/test/java/org/apache/blur/store/FastHdfsKeyValueDirectoryTestSuite.java
index d33104f..a3be66c 100644
--- a/blur-store/src/test/java/org/apache/blur/store/FastHdfsKeyValueDirectoryTestSuite.java
+++ b/blur-store/src/test/java/org/apache/blur/store/FastHdfsKeyValueDirectoryTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.blur.store;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Timer;
 
 import org.apache.blur.store.hdfs_v2.FastHdfsKeyValueDirectory;
 import org.apache.hadoop.conf.Configuration;
@@ -28,12 +29,15 @@ import org.junit.Test;
 
 public class FastHdfsKeyValueDirectoryTestSuite extends BaseDirectoryTestSuite {
 
+  private Timer _timer;
+
   @Override
   protected Directory setupDirectory() throws IOException {
     URI uri = new File(file, "hdfs").toURI();
     Path hdfsDirPath = new Path(uri.toString());
     Configuration conf = new Configuration();
-    return new FastHdfsKeyValueDirectory(conf, hdfsDirPath);
+    _timer = new Timer("IndexImporter", true);
+    return new FastHdfsKeyValueDirectory(_timer, conf, hdfsDirPath);
   }
 
   @Test
@@ -42,7 +46,8 @@ public class FastHdfsKeyValueDirectoryTestSuite extends BaseDirectoryTestSuite
{
 
   @Override
   protected void close() throws IOException {
-
+    _timer.cancel();
+    _timer.purge();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
index cec2ebf..26d75a4 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectoryTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Timer;
 
 import org.apache.blur.HdfsMiniClusterUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -45,16 +46,21 @@ public class FastHdfsKeyValueDirectoryTest {
 
   private Configuration _configuration = new Configuration();
   private static MiniDFSCluster _cluster;
+
+  private static Timer _timer;
   private Path _path;
 
   @BeforeClass
   public static void startCluster() {
     Configuration conf = new Configuration();
     _cluster = HdfsMiniClusterUtil.startDfs(conf, true, TMPDIR.getAbsolutePath());
+    _timer = new Timer("IndexImporter", true);
   }
 
   @AfterClass
   public static void stopCluster() {
+    _timer.cancel();
+    _timer.purge();
     HdfsMiniClusterUtil.shutdownDfs(_cluster);
   }
 
@@ -68,8 +74,8 @@ public class FastHdfsKeyValueDirectoryTest {
   @Test
   public void testMultipleWritersOpenOnSameDirectory() throws IOException {
     IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
-    FastHdfsKeyValueDirectory directory = new FastHdfsKeyValueDirectory(_configuration,
-        new Path(_path, "test_multiple"));
+    FastHdfsKeyValueDirectory directory = new FastHdfsKeyValueDirectory(_timer, _configuration,
new Path(_path,
+        "test_multiple"));
     IndexWriter writer1 = new IndexWriter(directory, config.clone());
     addDoc(writer1, getDoc(1));
     IndexWriter writer2 = new IndexWriter(directory, config.clone());

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a18af81/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
index 2031017..0122c63 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map.Entry;
+import java.util.Timer;
 
 import org.apache.blur.HdfsMiniClusterUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -42,16 +43,21 @@ public class HdfsKeyValueStoreTest {
 
   private Configuration _configuration = new Configuration();
   private static MiniDFSCluster _cluster;
+
+  private static Timer _timer;
   private Path _path;
 
   @BeforeClass
   public static void startCluster() {
     Configuration conf = new Configuration();
     _cluster = HdfsMiniClusterUtil.startDfs(conf, true, TMPDIR.getAbsolutePath());
+    _timer = new Timer("IndexImporter", true);
   }
 
   @AfterClass
   public static void stopCluster() {
+    _timer.cancel();
+    _timer.purge();
     HdfsMiniClusterUtil.shutdownDfs(_cluster);
   }
 
@@ -64,7 +70,7 @@ public class HdfsKeyValueStoreTest {
 
   @Test
   public void testPutGet() throws IOException {
-    HdfsKeyValueStore store = new HdfsKeyValueStore(_configuration, _path);
+    HdfsKeyValueStore store = new HdfsKeyValueStore(_timer, _configuration, _path);
     store.put(toBytesRef("a"), toBytesRef("value1"));
     store.put(toBytesRef("b"), toBytesRef("value2"));
     store.sync();
@@ -78,7 +84,7 @@ public class HdfsKeyValueStoreTest {
 
   @Test
   public void testPutGetDelete() throws IOException {
-    HdfsKeyValueStore store = new HdfsKeyValueStore(_configuration, _path);
+    HdfsKeyValueStore store = new HdfsKeyValueStore(_timer, _configuration, _path);
     store.put(toBytesRef("a"), toBytesRef("value1"));
     store.put(toBytesRef("b"), toBytesRef("value2"));
     store.sync();
@@ -96,7 +102,7 @@ public class HdfsKeyValueStoreTest {
 
   @Test
   public void testPutGetReopen() throws IOException {
-    HdfsKeyValueStore store1 = new HdfsKeyValueStore(_configuration, _path);
+    HdfsKeyValueStore store1 = new HdfsKeyValueStore(_timer, _configuration, _path);
     store1.put(toBytesRef("a"), toBytesRef("value1"));
     store1.put(toBytesRef("b"), toBytesRef("value2"));
     store1.sync();
@@ -107,7 +113,7 @@ public class HdfsKeyValueStoreTest {
     assertEquals(new BytesRef("value2"), value1);
     store1.close();
 
-    HdfsKeyValueStore store2 = new HdfsKeyValueStore(_configuration, _path);
+    HdfsKeyValueStore store2 = new HdfsKeyValueStore(_timer, _configuration, _path);
     BytesRef value2 = new BytesRef();
     store2.get(toBytesRef("a"), value2);
     assertEquals(new BytesRef("value1"), value2);
@@ -118,7 +124,7 @@ public class HdfsKeyValueStoreTest {
 
   @Test
   public void testFileRolling() throws IOException {
-    HdfsKeyValueStore store = new HdfsKeyValueStore(_configuration, _path, 1000);
+    HdfsKeyValueStore store = new HdfsKeyValueStore(_timer, _configuration, _path, 1000);
     FileSystem fileSystem = _path.getFileSystem(_configuration);
     store.put(new BytesRef("a"), new BytesRef(""));
     assertEquals(1, fileSystem.listStatus(_path).length);
@@ -129,7 +135,7 @@ public class HdfsKeyValueStoreTest {
 
   @Test
   public void testFileGC() throws IOException {
-    HdfsKeyValueStore store = new HdfsKeyValueStore(_configuration, _path, 1000);
+    HdfsKeyValueStore store = new HdfsKeyValueStore(_timer, _configuration, _path, 1000);
     store.put(new BytesRef("a"), new BytesRef(""));
     FileSystem fileSystem = _path.getFileSystem(_configuration);
     assertEquals(1, fileSystem.listStatus(_path).length);
@@ -143,11 +149,11 @@ public class HdfsKeyValueStoreTest {
 
   // @Test
   public void testTwoKeyStoreInstancesWritingAtTheSameTime() throws IOException {
-    HdfsKeyValueStore store1 = new HdfsKeyValueStore(_configuration, _path);
+    HdfsKeyValueStore store1 = new HdfsKeyValueStore(_timer, _configuration, _path);
     listFiles();
     store1.put(new BytesRef("a1"), new BytesRef(new byte[2000]));
     listFiles();
-    HdfsKeyValueStore store2 = new HdfsKeyValueStore(_configuration, _path);
+    HdfsKeyValueStore store2 = new HdfsKeyValueStore(_timer, _configuration, _path);
     listFiles();
     store2.put(new BytesRef("a1"), new BytesRef(new byte[1000]));
     listFiles();
@@ -169,7 +175,7 @@ public class HdfsKeyValueStoreTest {
     store1.close();
     store2.close();
 
-    HdfsKeyValueStore store3 = new HdfsKeyValueStore(_configuration, _path);
+    HdfsKeyValueStore store3 = new HdfsKeyValueStore(_timer, _configuration, _path);
     Iterable<Entry<BytesRef, BytesRef>> scan = store3.scan(null);
     for (Entry<BytesRef, BytesRef> e : scan) {
       System.out.println(e.getValue().length);
@@ -179,9 +185,9 @@ public class HdfsKeyValueStoreTest {
 
   @Test
   public void testTwoKeyStoreInstancesWritingAtTheSameTimeSmallFiles() throws IOException
{
-    HdfsKeyValueStore store1 = new HdfsKeyValueStore(_configuration, _path, 1000);
+    HdfsKeyValueStore store1 = new HdfsKeyValueStore(_timer, _configuration, _path, 1000);
     store1.put(new BytesRef("a1"), new BytesRef(new byte[2000]));
-    HdfsKeyValueStore store2 = new HdfsKeyValueStore(_configuration, _path, 1000);
+    HdfsKeyValueStore store2 = new HdfsKeyValueStore(_timer, _configuration, _path, 1000);
     store2.put(new BytesRef("a1"), new BytesRef(new byte[1000]));
     try {
       store1.put(new BytesRef("a2"), new BytesRef(new byte[2000]));
@@ -195,7 +201,7 @@ public class HdfsKeyValueStoreTest {
     store2.sync();
     store2.close();
 
-    HdfsKeyValueStore store3 = new HdfsKeyValueStore(_configuration, _path);
+    HdfsKeyValueStore store3 = new HdfsKeyValueStore(_timer, _configuration, _path);
     Iterable<Entry<BytesRef, BytesRef>> scan = store3.scan(null);
     for (Entry<BytesRef, BytesRef> e : scan) {
       System.out.println(e.getValue().length);


Mime
View raw message