incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Changing the hdfs directory resource managment to not rely on GC to clean up old resources.
Date Mon, 29 Jun 2015 17:32:51 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master a71e2fe84 -> 2b31cd86f


Changing the hdfs directory resource managment to not rely on GC to clean up old resources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/2b31cd86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/2b31cd86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/2b31cd86

Branch: refs/heads/master
Commit: 2b31cd86f8587cf0765649f2028d6c91f86bbb8d
Parents: a71e2fe
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Jun 29 13:32:43 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Jun 29 13:32:43 2015 -0400

----------------------------------------------------------------------
 .../blur/store/hdfs/FSInputFileHandle.java      | 183 ++++++++++++++
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 245 +------------------
 .../apache/blur/store/hdfs/HdfsIndexInput.java  |  27 +-
 .../store/hdfs/HdfsDirectoryResourceTest.java   |  41 ++--
 4 files changed, 218 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2b31cd86/blur-store/src/main/java/org/apache/blur/store/hdfs/FSInputFileHandle.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/FSInputFileHandle.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/FSInputFileHandle.java
new file mode 100644
index 0000000..36a5b64
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/FSInputFileHandle.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.memory.MemoryLeakDetector;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FSInputFileHandle implements Closeable {
+
+  private final FileSystem _fileSystem;
+  private final Path _path;
+  private final Map<String, ManagedFSDataInputSequentialAccess> _seqAccessInputs;
+  private final AtomicBoolean _open = new AtomicBoolean(true);
+  private final ManagedFSDataInputRandomAccess _randomAccess;
+  private final boolean _resourceTracking;
+  private final String _name;
+
+  public FSInputFileHandle(FileSystem fileSystem, Path path, long length, String name, boolean
resourceTracking)
+      throws IOException {
+    _resourceTracking = resourceTracking;
+    _fileSystem = fileSystem;
+    _path = path;
+    _name = name;
+    _seqAccessInputs = new ConcurrentHashMap<String, ManagedFSDataInputSequentialAccess>();
+    FSDataInputStream inputStream = _fileSystem.open(_path);
+    _randomAccess = new ManagedFSDataInputRandomAccess(inputStream, _path, length);
+    trackObject(inputStream, "Random Inputstream", name, path);
+  }
+
+  public FSDataInputSequentialAccess openForSequentialInput() throws IOException {
+    ensureOpen();
+    FSDataInputStream inputStream = _fileSystem.open(_path);
+    ManagedFSDataInputSequentialAccess in = new ManagedFSDataInputSequentialAccess(_path,
inputStream);
+    trackObject(inputStream, "Sequential Inputstream", _name, _path);
+    _seqAccessInputs.put(in.getId(), in);
+    return in;
+  }
+
+  private void ensureOpen() throws IOException {
+    if (!_open.get()) {
+      throw new IOException("Already closed!");
+    }
+  }
+
+  public void sequentialInputReset(FSDataInputSequentialAccess sequentialInput) throws IOException
{
+    ensureOpen();
+    ManagedFSDataInputSequentialAccess input = (ManagedFSDataInputSequentialAccess) sequentialInput;
+    _seqAccessInputs.remove(input._id);
+    sequentialInput.close();
+  }
+
+  public FSDataInputRandomAccess getRandomAccess() {
+    return _randomAccess;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (_open.get()) {
+      _open.set(true);
+      IOUtils.closeQuietly(_randomAccess);
+      for (ManagedFSDataInputSequentialAccess in : _seqAccessInputs.values()) {
+        in.close();
+        IOUtils.closeQuietly(in);
+      }
+      _seqAccessInputs.clear();
+    }
+  }
+
+  static class ManagedFSDataInputSequentialAccess implements FSDataInputSequentialAccess
{
+
+    final Path _path;
+    final FSDataInputStream _input;
+    final String _id;
+
+    ManagedFSDataInputSequentialAccess(Path path, FSDataInputStream input) {
+      _path = path;
+      _input = input;
+      _id = UUID.randomUUID().toString();
+    }
+
+    String getId() {
+      return _id;
+    }
+
+    @Override
+    public void close() throws IOException {
+      _input.close();
+    }
+
+    @Override
+    public void skip(long amount) throws IOException {
+      _input.skip(amount);
+    }
+
+    @Override
+    public void seek(long filePointer) throws IOException {
+      _input.seek(filePointer);
+    }
+
+    @Override
+    public void readFully(byte[] b, int offset, int length) throws IOException {
+      _input.readFully(b, offset, length);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return _input.getPos();
+    }
+
+    @Override
+    public String toString() {
+      return _path.toString();
+    }
+
+  }
+
+  static class ManagedFSDataInputRandomAccess implements FSDataInputRandomAccess {
+    private final FSDataInputStream _inputStream;
+    private final Path _path;
+    private final long _length;
+
+    ManagedFSDataInputRandomAccess(FSDataInputStream inputStream, Path path, long length)
{
+      _inputStream = inputStream;
+      _path = path;
+      _length = length;
+    }
+
+    @Override
+    public void close() throws IOException {
+      _inputStream.close();
+    }
+
+    @Override
+    public int read(long filePointer, byte[] b, int offset, int length) throws IOException
{
+      return _inputStream.read(filePointer, b, offset, length);
+    }
+
+    @Override
+    public String toString() {
+      return _path.toString();
+    }
+
+    @Override
+    public Path getPath() {
+      return _path;
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+  }
+
+  protected <T> void trackObject(T t, String message, Object... args) {
+    if (_resourceTracking) {
+      MemoryLeakDetector.record(t, message, args);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2b31cd86/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 946b16d..cf5fb71 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -24,13 +24,10 @@ import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.lang.ref.WeakReference;
-import java.net.InetAddress;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
@@ -42,7 +39,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.log.Log;
@@ -54,7 +50,6 @@ import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -92,7 +87,6 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsSymlin
 
   private static final Timer TIMER;
   private static final BlockingQueue<Closeable> CLOSING_QUEUE = new LinkedBlockingQueue<Closeable>();
-  private static final BlockingQueue<WeakRef> WEAK_CLOSING_QUEUE = new LinkedBlockingQueue<WeakRef>();
 
   static class FStat {
     FStat(FileStatus fileStatus) {
@@ -108,33 +102,9 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     final long _length;
   }
 
-  static class StreamPair {
-
-    final FSDataInputStream _random;
-    final FSDataInputStream _stream;
-
-    StreamPair(FSDataInputStream random, FSDataInputStream stream) {
-      _random = random;
-      _stream = stream;
-    }
-
-    void close() {
-      IOUtils.closeQuietly(_random);
-      IOUtils.closeQuietly(_stream);
-    }
-
-    FSDataInputStream getInputStream(boolean stream) {
-      if (stream) {
-        return _stream;
-      }
-      return _random;
-    }
-  }
-
   static {
     TIMER = new Timer("HdfsDirectory-Timer", true);
     TIMER.schedule(getClosingQueueTimerTask(), TimeUnit.SECONDS.toMillis(3), TimeUnit.SECONDS.toMillis(3));
-    TIMER.schedule(getSequentialRefClosingQueueTimerTask(), TimeUnit.SECONDS.toMillis(3),
TimeUnit.SECONDS.toMillis(3));
   }
 
   protected final Path _path;
@@ -145,12 +115,9 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   protected final Map<String, Path> _symlinkPathMap = new ConcurrentHashMap<String,
Path>();
   protected final Map<String, Boolean> _copyFileMap = new ConcurrentHashMap<String,
Boolean>();
   protected final Map<String, Path> _copyFilePathMap = new ConcurrentHashMap<String,
Path>();
-  protected final Map<String, FSDataInputRandomAccess> _inputMap = new ConcurrentHashMap<String,
FSDataInputRandomAccess>();
   protected final boolean _useCache = true;
   protected final boolean _asyncClosing;
   protected final SequentialReadControl _sequentialReadControl;
-  protected final String _hostname;
-  protected final TimerTask _reportOnBlockLocality;
   protected final boolean _resourceTracking;
 
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
@@ -196,11 +163,6 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       _metricsGroup = metricsGroup;
     }
 
-    _hostname = InetAddress.getLocalHost().getHostName();
-    LOG.info("Using hostname [{0}] for data locality checks directory [{1}].", _hostname,
_path);
-    _reportOnBlockLocality = reportOnBlockLocality();
-    TIMER.schedule(_reportOnBlockLocality, TimeUnit.SECONDS.toMillis(30), TimeUnit.SECONDS.toMillis(30));
-
     if (_useCache) {
       if (filesToExpose == null) {
         FileStatus[] listStatus = _fileSystem.listStatus(_path);
@@ -224,62 +186,6 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
   }
 
-  protected TimerTask reportOnBlockLocality() {
-    final Counter totalHdfsBlock = _metricsGroup.totalHdfsBlock;
-    final Counter localHdfsBlock = _metricsGroup.localHdfsBlock;
-    final AtomicLong prevTotalCount = new AtomicLong();
-    final AtomicLong prevLocalCount = new AtomicLong();
-    return new TimerTask() {
-      @Override
-      public void run() {
-        try {
-          long[] counts = runReport();
-          long total = counts[0];
-          long local = counts[1];
-          long prevTotal = prevTotalCount.get();
-          long prevLocal = prevLocalCount.get();
-
-          totalHdfsBlock.inc(total - prevTotal);
-          localHdfsBlock.inc(local - prevLocal);
-
-          prevTotalCount.set(total);
-          prevLocalCount.set(local);
-        } catch (Exception e) {
-          LOG.error("Unknown error.", e);
-        }
-      }
-    };
-  }
-
-  protected long[] runReport() throws IOException {
-    long total = 0;
-    long local = 0;
-    Collection<FSDataInputRandomAccess> values = _inputMap.values();
-    for (FSDataInputRandomAccess inputRandomAccess : values) {
-      Path path = inputRandomAccess.getPath();
-      long length = inputRandomAccess.length();
-      FileStatus fileStatus = _fileSystem.getFileStatus(path);
-      BlockLocation[] blockLocations = _fileSystem.getFileBlockLocations(fileStatus, 0L,
length);
-      for (BlockLocation blockLocation : blockLocations) {
-        if (isLocal(blockLocation)) {
-          local++;
-        }
-        total++;
-      }
-    }
-    return new long[] { total, local };
-  }
-
-  private boolean isLocal(BlockLocation blockLocation) throws IOException {
-    String[] hosts = blockLocation.getHosts();
-    for (String host : hosts) {
-      if (host.equals(_hostname)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   private void addToCache(FileStatus fileStatus) throws IOException {
     if (!fileStatus.isDir()) {
       Path p = fileStatus.getPath();
@@ -301,22 +207,6 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
   }
 
-  private static TimerTask getSequentialRefClosingQueueTimerTask() {
-    return new TimerTask() {
-      @Override
-      public void run() {
-        Iterator<WeakRef> iterator = WEAK_CLOSING_QUEUE.iterator();
-        while (iterator.hasNext()) {
-          WeakRef weakRef = iterator.next();
-          if (weakRef.isClosable()) {
-            iterator.remove();
-            CLOSING_QUEUE.add(weakRef._closeable);
-          }
-        }
-      }
-    };
-  }
-
   private static TimerTask getClosingQueueTimerTask() {
     return new TimerTask() {
       @Override
@@ -413,7 +303,6 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
         } else {
           outputStream.close();
         }
-        openForInput(name, length);
       }
 
       @Override
@@ -446,67 +335,13 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       throw new FileNotFoundException("File [" + name + "] not found.");
     }
     long fileLength = fileLength(name);
-    FSDataInputRandomAccess inputRandomAccess = openForInput(name, fileLength);
-    HdfsIndexInput input = new HdfsIndexInput(this, inputRandomAccess, fileLength, _metricsGroup,
name,
+    Path path = getPath(name);
+    FSInputFileHandle fsInputFileHandle = new FSInputFileHandle(_fileSystem, path, fileLength,
name, _resourceTracking);
+    HdfsIndexInput input = new HdfsIndexInput(this, fsInputFileHandle, fileLength, _metricsGroup,
name,
         _sequentialReadControl.clone());
     return input;
   }
 
-  protected synchronized FSDataInputRandomAccess openForInput(String name, long length) throws
IOException {
-    final Path path = getPath(name);
-    FSDataInputRandomAccess input = _inputMap.get(name);
-    if (input != null) {
-      return input;
-    }
-    Tracer trace = Trace.trace("filesystem - open", Trace.param("path", path));
-    try {
-      final FSDataInputStream inputStream = _fileSystem.open(path);
-      trackObject(inputStream, "Random Inputstream", name, path);
-      FSDataInputRandomAccess randomInputStream = new HdfsFSDataInputRandomAccess(inputStream,
path, length);
-      _inputMap.put(name, randomInputStream);
-      return randomInputStream;
-    } finally {
-      trace.done();
-    }
-  }
-
-  static class HdfsFSDataInputRandomAccess implements FSDataInputRandomAccess {
-    private final FSDataInputStream _inputStream;
-    private final Path _path;
-    private final long _length;
-
-    HdfsFSDataInputRandomAccess(FSDataInputStream inputStream, Path path, long length) {
-      _inputStream = inputStream;
-      _path = path;
-      _length = length;
-    }
-
-    @Override
-    public void close() throws IOException {
-      _inputStream.close();
-    }
-
-    @Override
-    public int read(long filePointer, byte[] b, int offset, int length) throws IOException
{
-      return _inputStream.read(filePointer, b, offset, length);
-    }
-
-    @Override
-    public String toString() {
-      return _path.toString();
-    }
-
-    @Override
-    public Path getPath() {
-      return _path;
-    }
-
-    @Override
-    public long length() {
-      return _length;
-    }
-  }
-
   @Override
   public String[] listAll() throws IOException {
     LOG.debug("listAll [{0}]", getPath());
@@ -576,13 +411,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   }
 
   protected void delete(String name) throws IOException {
-    FSDataInputRandomAccess inputStream = _inputMap.remove(name);
     Tracer trace = Trace.trace("filesystem - delete", Trace.param("path", getPath(name)));
-    if (inputStream != null) {
-      IOUtils.closeQuietly(inputStream);
-    } else {
-      LOG.error("Strange problem, random access input was not found for [{0}]", name);
-    }
     if (_useCache) {
       _symlinkMap.remove(name);
       _symlinkPathMap.remove(name);
@@ -634,7 +463,6 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
 
   @Override
   public void close() throws IOException {
-    _reportOnBlockLocality.cancel();
     TIMER.purge();
   }
 
@@ -779,71 +607,4 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     return this;
   }
 
-  protected FSDataInputSequentialAccess openForSequentialInput(String name, Object key) throws
IOException {
-    return openInputStream(name, key, _fileSystem);
-  }
-
-  protected FSDataInputSequentialAccess openInputStream(String name, Object key, FileSystem
fileSystem)
-      throws IOException {
-    final Path path = getPath(name);
-    final FSDataInputStream input = fileSystem.open(path);
-    trackObject(input, "Sequential Inputstream", name, _path);
-    FSDataInputSequentialAccess sequentialAccess = toFSDataInputSequentialAccess(path, input);
-    WEAK_CLOSING_QUEUE.add(new WeakRef(sequentialAccess, key));
-    return sequentialAccess;
-  }
-
-  private static FSDataInputSequentialAccess toFSDataInputSequentialAccess(final Path path,
-      final FSDataInputStream input) {
-    return new FSDataInputSequentialAccess() {
-
-      @Override
-      public void close() throws IOException {
-        input.close();
-      }
-
-      @Override
-      public void skip(long amount) throws IOException {
-        input.skip(amount);
-      }
-
-      @Override
-      public void seek(long filePointer) throws IOException {
-        input.seek(filePointer);
-      }
-
-      @Override
-      public void readFully(byte[] b, int offset, int length) throws IOException {
-        input.readFully(b, offset, length);
-      }
-
-      @Override
-      public long getPos() throws IOException {
-        return input.getPos();
-      }
-
-      @Override
-      public String toString() {
-        return path.toString();
-      }
-
-    };
-  }
-
-  static class WeakRef {
-
-    final Closeable _closeable;
-    final WeakReference<Object> _ref;
-
-    WeakRef(Closeable closeable, Object key) {
-      _closeable = closeable;
-      _ref = new WeakReference<Object>(key);
-    }
-
-    boolean isClosable() {
-      return _ref.get() == null ? true : false;
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2b31cd86/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
index 6c4e939..d45ae04 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
@@ -30,25 +30,21 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
   private static final Log LOG = LogFactory.getLog(HdfsIndexInput.class);
 
   private final long _length;
-  private final FSDataInputRandomAccess _input;
   private final MetricsGroup _metricsGroup;
-  private final String _name;
-  private final HdfsDirectory _dir;
+  private final FSInputFileHandle _inputFileHandle;
 
   private SequentialReadControl _sequentialReadControl;
-
-  private long _prevFilePointer;
   private FSDataInputSequentialAccess _sequentialInput;
+  private boolean _clone;
+  private long _prevFilePointer;
 
-  public HdfsIndexInput(HdfsDirectory dir, FSDataInputRandomAccess input, long length, MetricsGroup
metricsGroup,
+  public HdfsIndexInput(HdfsDirectory dir, FSInputFileHandle inputFileHandle, long length,
MetricsGroup metricsGroup,
       String name, SequentialReadControl sequentialReadControl) throws IOException {
-    super("HdfsIndexInput(" + name + "@" + "" + input + ")");
+    super("HdfsIndexInput(" + name + "@" + "" + inputFileHandle + ")");
     _sequentialReadControl = sequentialReadControl;
-    _dir = dir;
-    _input = input;
     _length = length;
     _metricsGroup = metricsGroup;
-    _name = name;
+    _inputFileHandle = inputFileHandle;
   }
 
   @Override
@@ -79,6 +75,8 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
           LOG.debug("Current Pos [{0}] Prev Pos [{1}] Diff [{2}]", filePointer, _prevFilePointer,
filePointer
               - _prevFilePointer);
           _sequentialReadControl.reset();
+          _inputFileHandle.sequentialInputReset(_sequentialInput);
+          _sequentialInput = null;
         }
       }
     }
@@ -87,7 +85,7 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
       if (_sequentialInput == null) {
         Tracer trace = Trace.trace("filesystem - read - openForSequentialInput", Trace.param("file",
toString()),
             Trace.param("location", getFilePointer()));
-        _sequentialInput = _dir.openForSequentialInput(_name, this);
+        _sequentialInput = _inputFileHandle.openForSequentialInput();
         trace.done();
       }
     }
@@ -112,7 +110,7 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
       int olen = length;
       while (length > 0) {
         int amount;
-        amount = _input.read(filePointer, b, offset, length);
+        amount = _inputFileHandle.getRandomAccess().read(filePointer, b, offset, length);
         length -= amount;
         offset += amount;
         filePointer += amount;
@@ -132,11 +130,14 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
     clone._sequentialInput = null;
     clone._sequentialReadControl = _sequentialReadControl.clone();
     clone._sequentialReadControl.reset();
+    clone._clone = true;
     return clone;
   }
 
   @Override
   protected void closeInternal() throws IOException {
-
+    if (!_clone) {
+      _inputFileHandle.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2b31cd86/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
index ce00edc..7496306 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
@@ -37,6 +37,9 @@ import org.junit.Test;
 
 public class HdfsDirectoryResourceTest {
 
+  private static final long WRITE_SIZE = 10000000L;
+  private static final long READ_SIZE = 2000000L;
+
   private static final int WAIT_TIME_IN_SECONDS = 10000;
 
   private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir",
@@ -69,54 +72,46 @@ public class HdfsDirectoryResourceTest {
     HdfsDirectory dir = new HdfsDirectory(_configuration, path, null, null, resourceTracking);
     try {
       String name = "_1.file";
-      exeucteWrites(dir, name);
+      executeWrites(dir, name);
       executeReads(dir, name);
-      assertTrue(waitForSeqReadsToClose());
+      assertResourceCount(0);
       dir.deleteFile(name);
-      assertTrue(waitForRandomAccessToClose());
     } finally {
       dir.close();
     }
   }
 
-  private void exeucteWrites(HdfsDirectory dir, String name) throws IOException {
+  private void executeWrites(HdfsDirectory dir, String name) throws IOException {
     IndexOutput output = dir.createOutput(name, IOContext.DEFAULT);
-    writeData(output, 100000000L);
+    writeData(output, WRITE_SIZE);
     output.close();
   }
 
-  private void executeReads(HdfsDirectory dir, String name) throws IOException {
+  private void executeReads(HdfsDirectory dir, String name) throws IOException, InterruptedException
{
     IndexInput input = dir.openInput(name, IOContext.READ);
+    assertResourceCount(1);
     input.readLong();
     input.seek(0L);
     for (int i = 0; i < 2; i++) {
-      readSeq(input.clone(), 20000000L);
+      readSeq(input.clone(), READ_SIZE);
+      assertResourceCount(1 + i + 1);
     }
     input.close();
   }
 
-  private boolean waitForRandomAccessToClose() throws InterruptedException {
-    for (int i = 0; i < WAIT_TIME_IN_SECONDS; i++) {
-      Thread.sleep(1000);
-      Runtime.getRuntime().gc();
-      Runtime.getRuntime().gc();
-      if (MemoryLeakDetector.isEmpty()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean waitForSeqReadsToClose() throws InterruptedException {
+  private void assertResourceCount(int count) throws InterruptedException {
     for (int i = 0; i < WAIT_TIME_IN_SECONDS; i++) {
       Thread.sleep(1000);
       Runtime.getRuntime().gc();
       Runtime.getRuntime().gc();
-      if (MemoryLeakDetector.getCount() == 1) {
-        return true;
+      int memLeakDet = MemoryLeakDetector.getCount();
+      if (memLeakDet == count) {
+        return;
+      } else {
+        System.out.println("MemoryLeakDetector [" + memLeakDet + "] assertion [" + count
+ "]");
       }
     }
-    return false;
+    fail();
   }
 
   private void readSeq(IndexInput input, long read) throws IOException {


Mime
View raw message