incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/3] git commit: Swapping FSDataInputStream reference in the HdfsIndexInput to be interfaces. This will allow for references of locally copied files to be swapped out after they are relocated for local reads.
Date Sun, 05 Apr 2015 21:21:07 GMT
Swapping FSDataInputStream reference in the HdfsIndexInput to be interfaces.  This will allow
for references of locally copied files to be swapped out after they are relocated for local
reads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/08565d13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/08565d13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/08565d13

Branch: refs/heads/master
Commit: 08565d13dbf4d23b599018c28b06eb72ef81f639
Parents: 7e2ada2
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Apr 5 17:21:39 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Apr 5 17:21:39 2015 -0400

----------------------------------------------------------------------
 .../store/hdfs/FSDataInputRandomAccess.java     | 26 ++++++++
 .../store/hdfs/FSDataInputSequentialAccess.java | 32 ++++++++++
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 66 +++++++++++++++-----
 .../apache/blur/store/hdfs/HdfsIndexInput.java  | 27 ++++----
 4 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/08565d13/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputRandomAccess.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputRandomAccess.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputRandomAccess.java
new file mode 100644
index 0000000..673a713
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputRandomAccess.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface FSDataInputRandomAccess extends Closeable {
+
+  int read(long filePointer, byte[] b, int offset, int length) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/08565d13/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputSequentialAccess.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputSequentialAccess.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputSequentialAccess.java
new file mode 100644
index 0000000..77de014
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputSequentialAccess.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface FSDataInputSequentialAccess extends Closeable {
+
+  void skip(long amount) throws IOException;
+
+  long getPos() throws IOException;
+
+  void seek(long filePointer) throws IOException;
+
+  void readFully(byte[] b, int offset, int length) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/08565d13/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 5c9f8ea..befcca9 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -132,7 +132,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   protected final Map<String, FStat> _fileStatusMap = new ConcurrentHashMap<String,
FStat>();
   protected final Map<String, Boolean> _symlinkMap = new ConcurrentHashMap<String,
Boolean>();
   protected final Map<String, Path> _symlinkPathMap = new ConcurrentHashMap<String,
Path>();
-  protected final Map<Path, FSDataInputStream> _inputMap = new ConcurrentHashMap<Path,
FSDataInputStream>();
+  protected final Map<Path, FSDataInputRandomAccess> _inputMap = new ConcurrentHashMap<Path,
FSDataInputRandomAccess>();
   protected final boolean _useCache = true;
   protected final boolean _asyncClosing;
   protected final Path _localCachePath = new Path("/tmp/cache");
@@ -192,7 +192,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
           WeakRef weakRef = iterator.next();
           if (weakRef.isClosable()) {
             iterator.remove();
-            CLOSING_QUEUE.add(weakRef._inputStream);
+            CLOSING_QUEUE.add(weakRef._closeable);
           }
         }
       }
@@ -308,7 +308,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     if (!fileExists(name)) {
       throw new FileNotFoundException("File [" + name + "] not found.");
     }
-    FSDataInputStream inputRandomAccess = openForInput(name);
+    FSDataInputRandomAccess inputRandomAccess = openForInput(name);
     long fileLength = fileLength(name);
     Path path = getPath(name);
     HdfsIndexInput input = new HdfsIndexInput(this, inputRandomAccess, fileLength, _metricsGroup,
path,
@@ -316,15 +316,27 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     return input;
   }
 
-  protected synchronized FSDataInputStream openForInput(String name) throws IOException {
+  protected synchronized FSDataInputRandomAccess openForInput(String name) throws IOException
{
     Path path = getPath(name);
-    FSDataInputStream input = _inputMap.get(path);
+    FSDataInputRandomAccess input = _inputMap.get(path);
     if (input != null) {
       return input;
     }
     Tracer trace = Trace.trace("filesystem - open", Trace.param("path", path));
     try {
-      FSDataInputStream randomInputStream = _fileSystem.open(path);
+      final FSDataInputStream inputStream = _fileSystem.open(path);
+      FSDataInputRandomAccess randomInputStream = new FSDataInputRandomAccess() {
+
+        @Override
+        public void close() throws IOException {
+          inputStream.close();
+        }
+
+        @Override
+        public int read(long filePointer, byte[] b, int offset, int length) throws IOException
{
+          return inputStream.read(filePointer, b, offset, length);
+        }
+      };
       _inputMap.put(path, randomInputStream);
       return randomInputStream;
     } finally {
@@ -397,7 +409,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
 
   protected void delete(String name) throws IOException {
     Path path = getPathOrSymlinkForDelete(name);
-    FSDataInputStream inputStream = _inputMap.remove(path);
+    FSDataInputRandomAccess inputStream = _inputMap.remove(path);
     Tracer trace = Trace.trace("filesystem - delete", Trace.param("path", path));
     if (inputStream != null) {
       IOUtils.closeQuietly(inputStream);
@@ -579,23 +591,49 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     return this;
   }
 
-  protected FSDataInputStream openForSequentialInput(Path p, Object key) throws IOException
{
+  protected FSDataInputSequentialAccess openForSequentialInput(Path p, Object key) throws
IOException {
     return openInputStream(_fileSystem, p, key);
   }
 
-  protected FSDataInputStream openInputStream(FileSystem fileSystem, Path p, Object key)
throws IOException {
-    FSDataInputStream input = fileSystem.open(p);
+  protected FSDataInputSequentialAccess openInputStream(FileSystem fileSystem, Path p, Object
key) throws IOException {
+    final FSDataInputStream input = fileSystem.open(p);
     WEAK_CLOSING_QUEUE.add(new WeakRef(input, key));
-    return input;
+    return new FSDataInputSequentialAccess() {
+
+      @Override
+      public void close() throws IOException {
+        input.close();
+      }
+
+      @Override
+      public void skip(long amount) throws IOException {
+        input.skip(amount);
+      }
+
+      @Override
+      public void seek(long filePointer) throws IOException {
+        input.seek(filePointer);
+      }
+
+      @Override
+      public void readFully(byte[] b, int offset, int length) throws IOException {
+        input.readFully(b, offset, length);
+      }
+
+      @Override
+      public long getPos() throws IOException {
+        return input.getPos();
+      }
+    };
   }
 
   static class WeakRef {
 
-    final FSDataInputStream _inputStream;
+    final Closeable _closeable;
     final WeakReference<Object> _ref;
 
-    WeakRef(FSDataInputStream input, Object key) {
-      _inputStream = input;
+    WeakRef(Closeable closeable, Object key) {
+      _closeable = closeable;
       _ref = new WeakReference<Object>(key);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/08565d13/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
index bcbd26f..3abff1e 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
@@ -23,7 +23,6 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.store.IndexInput;
 
@@ -32,7 +31,7 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
   private static final Log LOG = LogFactory.getLog(HdfsIndexInput.class);
 
   private final long _length;
-  private final FSDataInputStream _inputStream;
+  private final FSDataInputRandomAccess _input;
   private final MetricsGroup _metricsGroup;
   private final Path _path;
   private final HdfsDirectory _dir;
@@ -40,14 +39,14 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
   private SequentialReadControl _sequentialReadControl;
 
   private long _prevFilePointer;
-  private FSDataInputStream _sequentialInputStream;
+  private FSDataInputSequentialAccess _sequentialInput;
 
-  public HdfsIndexInput(HdfsDirectory dir, FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup,
+  public HdfsIndexInput(HdfsDirectory dir, FSDataInputRandomAccess inputStream, long length,
MetricsGroup metricsGroup,
       Path path, SequentialReadControl sequentialReadControl) throws IOException {
     super("HdfsIndexInput(" + path.toString() + ")");
     _sequentialReadControl = sequentialReadControl;
     _dir = dir;
-    _inputStream = inputStream;
+    _input = inputStream;
     _length = length;
     _metricsGroup = metricsGroup;
     _path = path;
@@ -81,7 +80,7 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
         // "]");
 
         if (_sequentialReadControl.shouldSkipInput(filePointer, _prevFilePointer)) {
-          _sequentialInputStream.skip(filePointer - _prevFilePointer);
+          _sequentialInput.skip(filePointer - _prevFilePointer);
         } else {
           LOG.debug("Current Pos [{0}] Prev Pos [{1}] Diff [{2}]", filePointer, _prevFilePointer,
filePointer
               - _prevFilePointer);
@@ -91,20 +90,20 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
     if (_sequentialReadControl.switchToSequentialRead()) {
 
       _sequentialReadControl.setEnabled(true);
-      if (_sequentialInputStream == null) {
+      if (_sequentialInput == null) {
         Tracer trace = Trace.trace("filesystem - read - openForSequentialInput", Trace.param("file",
_path),
             Trace.param("location", getFilePointer()));
-        _sequentialInputStream = _dir.openForSequentialInput(_path, this);
+        _sequentialInput = _dir.openForSequentialInput(_path, this);
         trace.done();
       }
     }
     if (_sequentialReadControl.isEnabled()) {
-      long pos = _sequentialInputStream.getPos();
+      long pos = _sequentialInput.getPos();
       if (pos != filePointer) {
-        _sequentialInputStream.seek(filePointer);
+        _sequentialInput.seek(filePointer);
       }
-      _sequentialInputStream.readFully(b, offset, length);
-      filePointer = _sequentialInputStream.getPos();
+      _sequentialInput.readFully(b, offset, length);
+      filePointer = _sequentialInput.getPos();
       // @TODO add metrics back
     } else {
       filePointer = randomAccessRead(b, offset, length, start, filePointer);
@@ -119,7 +118,7 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
       int olen = length;
       while (length > 0) {
         int amount;
-        amount = _inputStream.read(filePointer, b, offset, length);
+        amount = _input.read(filePointer, b, offset, length);
         length -= amount;
         offset += amount;
         filePointer += amount;
@@ -136,7 +135,7 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
   @Override
   public IndexInput clone() {
     HdfsIndexInput clone = (HdfsIndexInput) super.clone();
-    clone._sequentialInputStream = null;
+    clone._sequentialInput = null;
     clone._sequentialReadControl = _sequentialReadControl.clone();
     return clone;
   }


Mime
View raw message