incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Fixed BLUR-282, basically the way the IndexInputCloser was implemented was not working correctly. This would let file references build until a full GC, during that time the file in HDFS that were referenced would not be removed. This
Date Sat, 19 Oct 2013 21:51:56 GMT
Fixed BLUR-282, basically the way the IndexInputCloser was implemented was not working correctly.
 This would let file references build until a full GC, during that time the file in HDFS that
were referenced would not be removed.  This would have caused a lot of problems in a large
system


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/b9637861
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/b9637861
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/b9637861

Branch: refs/heads/master
Commit: b9637861aa6d810d2b4bead6eb19dd6c5f9b7e22
Parents: adf98da
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Oct 19 17:46:19 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Oct 19 17:51:34 2013 -0400

----------------------------------------------------------------------
 .../indexserver/DistributedIndexServer.java     |   5 +-
 .../manager/indexserver/LocalIndexServer.java   |   5 +-
 .../blur/manager/writer/BlurNRTIndex.java       | 102 +++++++--------
 .../manager/writer/BlurIndexReaderTest.java     |   4 -
 .../blur/manager/writer/BlurNRTIndexTest.java   |   6 +-
 .../writer/DirectoryReferenceCounterTest.java   |   5 +-
 .../refcounter/DirectoryReferenceCounter.java   |  67 ++++------
 .../refcounter/DirectoryReferenceFileGC.java    |   2 +-
 .../store/refcounter/IndexInputCloser.java      |  94 --------------
 .../store/refcounter/IndexInputReference.java   |  79 ++++++++++++
 .../store/blockcache_v2/CacheIndexInput.java    | 126 +++++++++++--------
 11 files changed, 237 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b9637861/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index f0e6590..bf50852 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -41,7 +41,6 @@ import org.apache.blur.concurrent.Executors;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
 import org.apache.blur.manager.BlurFilterCache;
 import org.apache.blur.manager.clusterstatus.ClusterStatus;
 import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
@@ -109,7 +108,6 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
   private final DirectoryReferenceFileGC _gc;
   private final WatchChildren _watchOnlineShards;
   private final SharedMergeScheduler _mergeScheduler;
-  private final IndexInputCloser _indexInputCloser;
   private final ExecutorService _searchExecutor;
   private final BlurIndexRefresher _refresher;
   private final BlurIndexCloser _indexCloser;
@@ -153,7 +151,6 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
     _mergeScheduler = _closer.register(new SharedMergeScheduler());
     
     
-    _indexInputCloser = _closer.register(new IndexInputCloser());
     _refresher = _closer.register(new BlurIndexRefresher());
     _indexCloser = _closer.register(new BlurIndexCloser());
     _timerCacheFlush = setupFlushCacheTimer();
@@ -470,7 +467,7 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
       BlurIndexReader reader = new BlurIndexReader(shardContext, dir, _refresher, _indexCloser);
       index = reader;
     } else {
-      BlurNRTIndex writer = new BlurNRTIndex(shardContext, _mergeScheduler, _indexInputCloser,
dir, _gc, _searchExecutor);
+      BlurNRTIndex writer = new BlurNRTIndex(shardContext, _mergeScheduler, dir, _gc, _searchExecutor);
       index = writer;
     }
     _filterCache.opening(table, shard, index);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b9637861/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
index 524cfd9..f9652ce 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
@@ -35,7 +35,6 @@ import java.util.concurrent.Executors;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
@@ -62,7 +61,6 @@ public class LocalIndexServer extends AbstractIndexServer {
 
   private final Map<String, Map<String, BlurIndex>> _readersMap = new ConcurrentHashMap<String,
Map<String, BlurIndex>>();
   private final SharedMergeScheduler _mergeScheduler;
-  private final IndexInputCloser _indexInputCloser;
   private final DirectoryReferenceFileGC _gc;
   private final ExecutorService _searchExecutor;
   private final TableContext _tableContext;
@@ -72,7 +70,6 @@ public class LocalIndexServer extends AbstractIndexServer {
     _closer = Closer.create();
     _tableContext = TableContext.create(tableDescriptor);
     _mergeScheduler = _closer.register(new SharedMergeScheduler());
-    _indexInputCloser = _closer.register(new IndexInputCloser());
     _gc = _closer.register(new DirectoryReferenceFileGC());
     _searchExecutor = Executors.newCachedThreadPool();
     _closer.register(new CloseableExecutorService(_searchExecutor));
@@ -148,7 +145,7 @@ public class LocalIndexServer extends AbstractIndexServer {
 
   private BlurIndex openIndex(String table, String shard, Directory dir) throws CorruptIndexException,
IOException {
     ShardContext shardContext = ShardContext.create(_tableContext, shard);
-    BlurNRTIndex index = new BlurNRTIndex(shardContext, _mergeScheduler, _indexInputCloser,
dir, _gc, _searchExecutor);
+    BlurNRTIndex index = new BlurNRTIndex(shardContext, _mergeScheduler, dir, _gc, _searchExecutor);
     return index;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b9637861/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index d593089..f9365d4 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -40,7 +40,6 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
 import org.apache.blur.lucene.warmup.TraceableDirectory;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.IndexSearcherClosableNRT;
@@ -95,18 +94,18 @@ public class BlurNRTIndex extends BlurIndex {
   private final ReadWriteLock _lock = new ReentrantReadWriteLock();
   private long _lastRefresh = 0;
 
-  public BlurNRTIndex(ShardContext shardContext, SharedMergeScheduler mergeScheduler, IndexInputCloser
closer,
-      Directory directory, DirectoryReferenceFileGC gc, final ExecutorService searchExecutor)
throws IOException {
+  public BlurNRTIndex(ShardContext shardContext, SharedMergeScheduler mergeScheduler, Directory
directory,
+      DirectoryReferenceFileGC gc, final ExecutorService searchExecutor) throws IOException
{
     _tableContext = shardContext.getTableContext();
     _directory = directory;
     _shardContext = shardContext;
-    
+
     FieldManager fieldManager = _tableContext.getFieldManager();
     Analyzer analyzer = fieldManager.getAnalyzerForIndex();
     IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
     conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
     conf.setSimilarity(_tableContext.getSimilarity());
-    
+
     SnapshotDeletionPolicy sdp;
     if (snapshotsDirectoryExists()) {
       // load existing snapshots
@@ -115,16 +114,16 @@ public class BlurNRTIndex extends BlurIndex {
       sdp = new SnapshotDeletionPolicy(_tableContext.getIndexDeletionPolicy());
     }
     conf.setIndexDeletionPolicy(sdp);
-    conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext, _isClosed));
+//    conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext, _isClosed));
 
     TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
     mergePolicy.setUseCompoundFile(false);
     conf.setMergeScheduler(mergeScheduler.getMergeScheduler());
 
-    DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(directory,
gc, closer);
+    DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(directory,
gc);
     // This directory allows for warm up by adding tracing ability.
     TraceableDirectory dir = new TraceableDirectory(referenceCounter);
-    
+
     SimpleTimer simpleTimer = new SimpleTimer();
     simpleTimer.start("writerOpen");
     _writer = new BlurIndexWriter(dir, conf, true);
@@ -161,19 +160,19 @@ public class BlurNRTIndex extends BlurIndex {
   }
 
   /**
-   * The snapshots directory contains a file per snapshot.
-   * Name of the file is the snapshot name and it stores the segments filename
+   * The snapshots directory contains a file per snapshot. Name of the file is
+   * the snapshot name and it stores the segments filename
    * 
    * @return Map<String, String>
    * @throws IOException
    */
   private Map<String, String> loadExistingSnapshots() throws IOException {
     Map<String, String> snapshots = new HashMap<String, String>();
-    
+
     FileSystem fileSystem = getFileSystem();
     FileStatus[] status = fileSystem.listStatus(getSnapshotsDirectoryPath());
-    
-    for (int i=0;i<status.length;i++){
+
+    for (int i = 0; i < status.length; i++) {
       FileStatus fileStatus = status[i];
       String snapshotName = fileStatus.getPath().getName();
       // cleanup all tmp files
@@ -181,16 +180,16 @@ public class BlurNRTIndex extends BlurIndex {
         fileSystem.delete(fileStatus.getPath(), true);
         continue;
       }
-      BufferedReader br=new BufferedReader(new InputStreamReader(fileSystem.open(fileStatus.getPath())));
-      String segmentsFilename=br.readLine();
-      if (segmentsFilename != null){
+      BufferedReader br = new BufferedReader(new InputStreamReader(fileSystem.open(fileStatus.getPath())));
+      String segmentsFilename = br.readLine();
+      if (segmentsFilename != null) {
         snapshots.put(snapshotName, segmentsFilename);
       }
     }
     return snapshots;
   }
-  
-  private boolean snapshotsDirectoryExists() throws IOException{
+
+  private boolean snapshotsDirectoryExists() throws IOException {
     Path shardHdfsDirPath = _shardContext.getHdfsDirPath();
     FileSystem fileSystem = getFileSystem();
     Path shardSnapshotsDirPath = new Path(shardHdfsDirPath, SNAPSHOTS_FOLDER_NAME);
@@ -199,7 +198,7 @@ public class BlurNRTIndex extends BlurIndex {
     }
     return false;
   }
-  
+
   @Override
   public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException
{
     _lock.readLock().lock();
@@ -342,37 +341,37 @@ public class BlurNRTIndex extends BlurIndex {
     SnapshotDeletionPolicy snapshotter = getSnapshotter();
     Map<String, String> existingSnapshots = snapshotter.getSnapshots();
     if (existingSnapshots.containsKey(name)) {
-      LOG.error("A Snapshot already exists with the same name [{0}] on [{1}/{2}].", 
-          name, _tableContext.getTable(), _shardContext.getShard());
-      throw new IOException("A Snapshot already exists with the same name [" + name + "]
on " +
-      	"["+_tableContext.getTable()+"/"+_shardContext.getShard()+"].");
+      LOG.error("A Snapshot already exists with the same name [{0}] on [{1}/{2}].", name,
_tableContext.getTable(),
+          _shardContext.getShard());
+      throw new IOException("A Snapshot already exists with the same name [" + name + "]
on " + "["
+          + _tableContext.getTable() + "/" + _shardContext.getShard() + "].");
     }
     _writer.commit();
     IndexCommit indexCommit = snapshotter.snapshot(name);
-    
+
     /*
      * Persist the snapshots info into a tmp file under the snapshots sub-folder
-     * and once writing is finished, close the writer.
-     * Now rename the tmp file to an actual snapshots file.
-     * This make the file write an atomic operation
+     * and once writing is finished, close the writer. Now rename the tmp file
+     * to an actual snapshots file. This make the file write an atomic operation
      * 
-     * The name of the file is the snapshot name and its contents specify the segments file
name
+     * The name of the file is the snapshot name and its contents specify the
+     * segments file name
      */
     String segmentsFilename = indexCommit.getSegmentsFileName();
     FileSystem fileSystem = getFileSystem();
     Path shardSnapshotsDirPath = getSnapshotsDirectoryPath();
     BlurUtil.createPath(fileSystem, shardSnapshotsDirPath);
     Path newTmpSnapshotFile = new Path(shardSnapshotsDirPath, name + SNAPSHOTS_TMPFILE_EXTENSION);
-    BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fileSystem.create(newTmpSnapshotFile,true)));
+    BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fileSystem.create(newTmpSnapshotFile,
true)));
     br.write(segmentsFilename);
     br.close();
-    
+
     // now rename the tmp file
     Path newSnapshotFile = new Path(shardSnapshotsDirPath, name);
     fileSystem.rename(newTmpSnapshotFile, newSnapshotFile);
-    
-    LOG.info("Snapshot [{0}] created successfully on [{1}/{2}].",
-        name, _tableContext.getTable(), _shardContext.getShard());
+
+    LOG.info("Snapshot [{0}] created successfully on [{1}/{2}].", name, _tableContext.getTable(),
+        _shardContext.getShard());
   }
 
   @Override
@@ -381,18 +380,19 @@ public class BlurNRTIndex extends BlurIndex {
     Map<String, String> existingSnapshots = snapshotter.getSnapshots();
     if (existingSnapshots.containsKey(name)) {
       snapshotter.release(name);
-      
-      // now delete the snapshot file stored in the snapshots directory under the shard
+
+      // now delete the snapshot file stored in the snapshots directory under
+      // the shard
       Path snapshotFilePath = new Path(getSnapshotsDirectoryPath(), name);
       getFileSystem().delete(snapshotFilePath, true);
-       
-      LOG.info("Snapshot [{0}] removed successfully from [{1}/{2}].",
-          name, _tableContext.getTable(), _shardContext.getShard());
+
+      LOG.info("Snapshot [{0}] removed successfully from [{1}/{2}].", name, _tableContext.getTable(),
+          _shardContext.getShard());
     } else {
-      LOG.error("No Snapshot exists with the name [{0}] on  [{1}/{2}].", 
-          name, _tableContext.getTable(), _shardContext.getShard());
-      throw new IOException("No Snapshot exists with the name [" + name + "] on " +
-        "["+_tableContext.getTable()+"/"+_shardContext.getShard()+"].");
+      LOG.error("No Snapshot exists with the name [{0}] on  [{1}/{2}].", name, _tableContext.getTable(),
+          _shardContext.getShard());
+      throw new IOException("No Snapshot exists with the name [" + name + "] on " + "[" +
_tableContext.getTable()
+          + "/" + _shardContext.getShard() + "].");
     }
   }
 
@@ -409,25 +409,25 @@ public class BlurNRTIndex extends BlurIndex {
    * @return SnapshotDeletionPolicy
    * @throws IOException
    */
-  private SnapshotDeletionPolicy getSnapshotter() throws IOException{
+  private SnapshotDeletionPolicy getSnapshotter() throws IOException {
     IndexDeletionPolicy idp = _writer.getConfig().getIndexDeletionPolicy();
     if (idp instanceof SnapshotDeletionPolicy) {
-      SnapshotDeletionPolicy snapshotter = (SnapshotDeletionPolicy)idp;
+      SnapshotDeletionPolicy snapshotter = (SnapshotDeletionPolicy) idp;
       return snapshotter;
     } else {
-      LOG.error("The index deletion policy for [{0}/{1}] does not support snapshots.", 
-          _tableContext.getTable(), _shardContext.getShard());
-      throw new IOException("The index deletion policy for ["+_tableContext.getTable()+"/"+_shardContext.getShard()+"]"
+
-      		" does not support snapshots.");
+      LOG.error("The index deletion policy for [{0}/{1}] does not support snapshots.", _tableContext.getTable(),
+          _shardContext.getShard());
+      throw new IOException("The index deletion policy for [" + _tableContext.getTable()
+ "/"
+          + _shardContext.getShard() + "]" + " does not support snapshots.");
     }
   }
-  
+
   public Path getSnapshotsDirectoryPath() throws IOException {
     Path shardHdfsDirPath = _shardContext.getHdfsDirPath();
     return new Path(shardHdfsDirPath, SNAPSHOTS_FOLDER_NAME);
   }
-  
-  private FileSystem getFileSystem() throws IOException{
+
+  private FileSystem getFileSystem() throws IOException {
     Path shardHdfsDirPath = _shardContext.getHdfsDirPath();
     Configuration configuration = _shardContext.getTableContext().getConfiguration();
     return shardHdfsDirPath.getFileSystem(configuration);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b9637861/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
index 6e5234d..4a63b75 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.blur.concurrent.Executors;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
@@ -55,7 +54,6 @@ public class BlurIndexReaderTest {
   private Configuration configuration;
 
   private DirectoryReferenceFileGC gc;
-  private IndexInputCloser closer;
   private SharedMergeScheduler mergeScheduler;
   private BlurIndexReader reader;
 
@@ -72,7 +70,6 @@ public class BlurIndexReaderTest {
 
     mergeScheduler = new SharedMergeScheduler();
     gc = new DirectoryReferenceFileGC();
-    closer = new IndexInputCloser();
 
     configuration = new Configuration();
     service = Executors.newThreadPool("test", 1);
@@ -100,7 +97,6 @@ public class BlurIndexReaderTest {
   public void tearDown() throws IOException {
     reader.close();
     mergeScheduler.close();
-    closer.close();
     gc.close();
     service.shutdownNow();
     refresher.close();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b9637861/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
index 8010323..cafcf46 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.blur.concurrent.Executors;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
@@ -61,7 +60,6 @@ public class BlurNRTIndexTest {
   private Configuration configuration;
 
   private DirectoryReferenceFileGC gc;
-  private IndexInputCloser closer;
   private SharedMergeScheduler mergeScheduler;
   private String uuid;
 
@@ -74,7 +72,6 @@ public class BlurNRTIndexTest {
 
     mergeScheduler = new SharedMergeScheduler();
     gc = new DirectoryReferenceFileGC();
-    closer = new IndexInputCloser();
 
     configuration = new Configuration();
     service = Executors.newThreadPool("test", 10);
@@ -101,14 +98,13 @@ public class BlurNRTIndexTest {
     path.mkdirs();
     FSDirectory directory = FSDirectory.open(path);
     ShardContext shardContext = ShardContext.create(tableContext, "test-shard-" + uuid);
-    writer = new BlurNRTIndex(shardContext, mergeScheduler, closer, directory, gc, service);
+    writer = new BlurNRTIndex(shardContext, mergeScheduler, directory, gc, service);
   }
 
   @After
   public void tearDown() throws IOException {
     writer.close();
     mergeScheduler.close();
-    closer.close();
     gc.close();
     service.shutdownNow();
     rm(base);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b9637861/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
index e077d12..ac31918 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/DirectoryReferenceCounterTest.java
@@ -29,7 +29,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
@@ -78,8 +77,7 @@ public class DirectoryReferenceCounterTest {
   public void testDirectoryReferenceCounter() throws CorruptIndexException, LockObtainFailedException,
IOException, InterruptedException {
     Directory directory = wrap(new RAMDirectory());
     DirectoryReferenceFileGC gc = new DirectoryReferenceFileGC();
-    IndexInputCloser closer = new IndexInputCloser();
-    DirectoryReferenceCounter counter = new DirectoryReferenceCounter(directory, gc, closer);
+    DirectoryReferenceCounter counter = new DirectoryReferenceCounter(directory, gc);
     IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, new KeywordAnalyzer());
     IndexWriter writer = new IndexWriter(counter, conf);
     int size = 100;
@@ -115,7 +113,6 @@ public class DirectoryReferenceCounterTest {
     last.close();
     writer.close();
     gc.close();
-    closer.close();
   }
 
   private Document getDoc() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b9637861/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
index b4e31f1..9f0181c 100644
--- a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
+++ b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
@@ -39,12 +39,10 @@ public class DirectoryReferenceCounter extends Directory implements DirectoryDec
   private final Directory _directory;
   private final Map<String, AtomicInteger> _refCounters = new ConcurrentHashMap<String,
AtomicInteger>();
   private final DirectoryReferenceFileGC _gc;
-  private final IndexInputCloser _closer;
 
-  public DirectoryReferenceCounter(Directory directory, DirectoryReferenceFileGC gc, IndexInputCloser
closer) {
+  public DirectoryReferenceCounter(Directory directory, DirectoryReferenceFileGC gc) {
     _directory = directory;
     _gc = gc;
-    _closer = closer;
   }
 
   private IndexInput wrap(String name, IndexInput input) {
@@ -53,7 +51,7 @@ public class DirectoryReferenceCounter extends Directory implements DirectoryDec
       counter = new AtomicInteger();
       _refCounters.put(name, counter);
     }
-    return new RefIndexInput(input.toString(), input, counter, _closer);
+    return new RefIndexInput(input.toString(), input, counter);
   }
 
   public void deleteFile(String name) throws IOException {
@@ -95,51 +93,16 @@ public class DirectoryReferenceCounter extends Directory implements DirectoryDec
     return wrap(name, input);
   }
 
-  public static class RefIndexInput extends IndexInput {
+  public static class RefIndexInput extends IndexInputReference {
 
     private IndexInput input;
     private AtomicInteger ref;
-    private boolean closed = false;
-    private IndexInputCloser closer;
 
-    public RefIndexInput(String resourceDescription, IndexInput input, AtomicInteger ref,
IndexInputCloser closer) {
+    public RefIndexInput(String resourceDescription, IndexInput input, AtomicInteger ref)
{
       super(resourceDescription);
       this.input = input;
       this.ref = ref;
-      this.closer = closer;
       ref.incrementAndGet();
-      if (closer != null) {
-        closer.add(this);
-      }
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-      // Seems like not all the clones are being closed...
-      if (!closed) {
-        LOG.debug("[{0}] Last resort closing.", input.toString());
-        close();
-      }
-    }
-
-    @Override
-    public RefIndexInput clone() {
-      RefIndexInput refIndexInput = (RefIndexInput) super.clone();
-      if (closer != null) {
-        closer.add(refIndexInput);
-      }
-      refIndexInput.input = (IndexInput) input.clone();
-      refIndexInput.ref.incrementAndGet();
-      return refIndexInput;
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (!closed) {
-        input.close();
-        ref.decrementAndGet();
-        closed = true;
-      }
     }
 
     @Override
@@ -212,6 +175,28 @@ public class DirectoryReferenceCounter extends Directory implements DirectoryDec
       return input.readStringStringMap();
     }
 
+    @Override
+    public RefIndexInput clone() {
+      RefIndexInput refIndexInput = (RefIndexInput) super.clone();
+      refIndexInput.input = (IndexInput) input.clone();
+      refIndexInput.ref.incrementAndGet();
+      return refIndexInput;
+    }
+
+    @Override
+    protected void closeBase() throws IOException {
+      input.close();
+      ref.decrementAndGet();
+      _isClosed = true;
+    }
+
+    @Override
+    protected void closeClone() throws IOException {
+      input.close();
+      ref.decrementAndGet();
+      _isClosed = true;
+    }
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b9637861/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
index 3e64f66..fa03be6 100644
--- a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
+++ b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
@@ -62,7 +62,7 @@ public class DirectoryReferenceFileGC extends TimerTask implements Closeable
{
         directory.deleteFile(name);
         return true;
       } else {
-        LOG.debug("File [{0}] had too many refs [{1}]", name, counter.get());
+        LOG.info("File [{0}] had too many refs [{1}]", name, counter.get());
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b9637861/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputCloser.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputCloser.java
b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputCloser.java
deleted file mode 100644
index cd4e2d3..0000000
--- a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputCloser.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.blur.lucene.store.refcounter;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.ref.ReferenceQueue;
-import java.lang.ref.WeakReference;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.util.IOUtils;
-
-public class IndexInputCloser implements Runnable, Closeable {
-
-  private static final Log LOG = LogFactory.getLog(IndexInputCloser.class);
-
-  private ReferenceQueue<IndexInput> referenceQueue = new ReferenceQueue<IndexInput>();
-  private Thread daemon;
-  private AtomicBoolean running = new AtomicBoolean();
-  private Collection<IndexInputCloserRef> refs = Collections
-      .newSetFromMap(new ConcurrentHashMap<IndexInputCloserRef, Boolean>());
-
-  static class IndexInputCloserRef extends WeakReference<IndexInput> implements Closeable
{
-    public IndexInputCloserRef(IndexInput referent, ReferenceQueue<? super IndexInput>
q) {
-      super(referent, q);
-    }
-
-    @Override
-    public void close() throws IOException {
-      IndexInput input = get();
-      if (input != null) {
-        LOG.debug("Closing indexinput [{0}]", input);
-        input.close();
-      }
-    }
-  }
-
-  public IndexInputCloser() {
-    running.set(true);
-    daemon = new Thread(this);
-    daemon.setDaemon(true);
-    daemon.setName("IndexIndexCloser");
-    daemon.start();
-  }
-
-  public void add(IndexInput indexInput) {
-    LOG.debug("Adding [{0}]", indexInput);
-    IndexInputCloserRef ref = new IndexInputCloserRef(indexInput, referenceQueue);
-    refs.add(ref);
-  }
-
-  public void close() {
-    running.set(false);
-    refs.clear();
-    daemon.interrupt();
-  }
-
-  @Override
-  public void run() {
-    while (running.get()) {
-      try {
-        IndexInputCloserRef ref = (IndexInputCloserRef) referenceQueue.remove();
-        LOG.debug("Closing [{0}]", ref);
-        IOUtils.closeWhileHandlingException(ref);
-        refs.remove(ref);
-      } catch (InterruptedException e) {
-        LOG.debug("Interrupted");
-        running.set(false);
-        return;
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b9637861/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputReference.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputReference.java
b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputReference.java
new file mode 100644
index 0000000..12fd562
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/IndexInputReference.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.lucene.store.refcounter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.WeakIdentityMap;
+
+public abstract class IndexInputReference extends IndexInput {
+
+  protected boolean _isClosed;
+  protected boolean _isClone;
+  private final WeakIdentityMap<IndexInputReference, Boolean> _clones;
+
+  protected IndexInputReference(String resourceDescription) {
+    super(resourceDescription);
+    _isClone = false;
+    _isClosed = false;
+    _clones = WeakIdentityMap.<IndexInputReference, Boolean> newConcurrentHashMap();
+  }
+
+  @Override
+  public final void close() throws IOException {
+    _clones.remove(this);
+    if (_isClone) {
+      closeClone();
+      return;
+    }
+    for (Iterator<IndexInputReference> it = _clones.keyIterator(); it.hasNext();) {
+      closeCloneQuietly(it.next());
+    }
+    _clones.clear();
+    closeBase();
+  }
+
+  private static void closeCloneQuietly(IndexInputReference ref) {
+    try {
+      if (ref != null) {
+        ref.closeClone();
+      }
+    } catch (IOException ioe) {
+      // ignore
+    }
+  }
+
+  @Override
+  public IndexInput clone() {
+    IndexInputReference clone = (IndexInputReference) super.clone();
+    clone._isClone = true;
+    _clones.put(clone, Boolean.TRUE);
+    return clone;
+  }
+
+  @Override
+  protected final void finalize() throws Throwable {
+    close();
+  }
+
+  protected abstract void closeBase() throws IOException;
+
+  protected abstract void closeClone() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b9637861/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
index e9bd601..1f74edb 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
@@ -20,6 +20,7 @@ package org.apache.blur.store.blockcache_v2;
 import java.io.IOException;
 
 import org.apache.blur.store.buffer.BufferStore;
+import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.IndexInput;
 
 public class CacheIndexInput extends IndexInput {
@@ -39,6 +40,7 @@ public class CacheIndexInput extends IndexInput {
   private long _position;
   private int _blockPosition;
   private boolean _quiet;
+  private boolean _isClosed;
 
   public CacheIndexInput(CacheDirectory directory, String fileName, IndexInput indexInput,
Cache cache)
       throws IOException {
@@ -54,10 +56,12 @@ public class CacheIndexInput extends IndexInput {
     _bufferSize = _cache.getFileBufferSize(_directory, _fileName);
     _quiet = _cache.shouldBeQuiet(_directory, _fileName);
     _key.setFileId(_fileId);
+    _isClosed = false;
   }
 
   @Override
   public byte readByte() throws IOException {
+    ensureOpen();
     tryToFill();
     byte b = _cacheValue.read(_blockPosition);
     _position++;
@@ -67,6 +71,7 @@ public class CacheIndexInput extends IndexInput {
 
   @Override
   public void readBytes(byte[] b, int offset, int len) throws IOException {
+    ensureOpen();
     while (len > 0) {
       tryToFill();
       int remaining = remaining();
@@ -81,6 +86,7 @@ public class CacheIndexInput extends IndexInput {
 
   @Override
   public short readShort() throws IOException {
+    ensureOpen();
     if (_cacheValue != null && remaining() >= 2) {
       short s = _cacheValue.readShort(_blockPosition);
       _blockPosition += 2;
@@ -92,6 +98,7 @@ public class CacheIndexInput extends IndexInput {
 
   @Override
   public int readInt() throws IOException {
+    ensureOpen();
     if (_cacheValue != null && remaining() >= 4) {
       int i = _cacheValue.readInt(_blockPosition);
       _blockPosition += 4;
@@ -103,6 +110,7 @@ public class CacheIndexInput extends IndexInput {
 
   @Override
   public long readLong() throws IOException {
+    ensureOpen();
     if (_cacheValue != null && remaining() >= 8) {
       long l = _cacheValue.readLong(_blockPosition);
       _blockPosition += 8;
@@ -112,6 +120,71 @@ public class CacheIndexInput extends IndexInput {
     return super.readLong();
   }
 
+  @Override
+  public void close() throws IOException {
+    if (!_isClosed) {
+      _isClosed = true;
+      _indexInput.close();
+      releaseCache();
+    }
+  }
+
+  @Override
+  public long getFilePointer() {
+    ensureOpen();
+    return _position;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    ensureOpen();
+    if (pos >= _fileLength) {
+      throw new IOException("Can not seek past end of file [" + pos + "] filelength [" +
_fileLength + "]");
+    }
+    if (_position == pos) {
+      // Seeking to same position
+      return;
+    }
+    long oldBlockId = getBlockId();
+    if (_blockPosition == _cacheBlockSize) {
+      // If we are at the end of the current block, but haven't actually fetched
+      // the next block then we are really on the previous.
+      oldBlockId--;
+    }
+    _position = pos;
+    long newBlockId = getBlockId(_position);
+    if (newBlockId == oldBlockId) {
+      // need to set new block position
+      _blockPosition = getBlockPosition();
+    } else {
+      releaseCache();
+    }
+  }
+
+  @Override
+  public long length() {
+    ensureOpen();
+    return _fileLength;
+  }
+
+  @Override
+  public IndexInput clone() {
+    ensureOpen();
+    CacheIndexInput clone = (CacheIndexInput) super.clone();
+    clone._key = _key.clone();
+    clone._indexInput = _indexInput.clone();
+    if (clone._cacheValue != null) {
+      clone._cacheValue.incRef();
+    }
+    clone._quiet = _cache.shouldBeQuiet(_directory, _fileName);
+    return clone;
+  }
+
+  @Override
+  protected void finalize() throws Throwable {
+    close();
+  }
+
   private int remaining() {
     return _cacheValue.length() - _blockPosition;
   }
@@ -182,57 +255,10 @@ public class CacheIndexInput extends IndexInput {
     return _position / _cacheBlockSize;
   }
 
-  @Override
-  public void close() throws IOException {
-    _indexInput.close();
-    releaseCache();
-  }
-
-  @Override
-  public long getFilePointer() {
-    return _position;
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    if (pos >= _fileLength) {
-      throw new IOException("Can not seek past end of file [" + pos + "] filelength [" +
_fileLength + "]");
-    }
-    if (_position == pos) {
-      // Seeking to same position
-      return;
-    }
-    long oldBlockId = getBlockId();
-    if (_blockPosition == _cacheBlockSize) {
-      // If we are at the end of the current block, but haven't actually fetched
-      // the next block then we are really on the previous.
-      oldBlockId--;
-    }
-    _position = pos;
-    long newBlockId = getBlockId(_position);
-    if (newBlockId == oldBlockId) {
-      // need to set new block position
-      _blockPosition = getBlockPosition();
-    } else {
-      releaseCache();
-    }
-  }
-
-  @Override
-  public long length() {
-    return _fileLength;
-  }
-
-  @Override
-  public IndexInput clone() {
-    CacheIndexInput clone = (CacheIndexInput) super.clone();
-    clone._key = _key.clone();
-    clone._indexInput = _indexInput.clone();
-    if (clone._cacheValue != null) {
-      clone._cacheValue.incRef();
+  private void ensureOpen() {
+    if (_isClosed) {
+      throw new AlreadyClosedException("Already closed: " + this);
     }
-    clone._quiet = _cache.shouldBeQuiet(_directory, _fileName);
-    return clone;
   }
 
 }


Mime
View raw message