incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [4/4] git commit: Work around for HDFS-9104.
Date Mon, 21 Sep 2015 14:43:05 GMT
Work around for HDFS-9104.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/af67f6c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/af67f6c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/af67f6c2

Branch: refs/heads/master
Commit: af67f6c2bf6f8d9d7e1794c5567390c248d6bd32
Parents: b57fd24
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Sep 21 10:42:45 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Sep 21 10:42:45 2015 -0400

----------------------------------------------------------------------
 .../blur/store/hdfs/FSInputFileHandle.java      | 48 ++++++++++++++++++--
 .../apache/blur/store/hdfs/HdfsDirectory.java   |  4 +-
 2 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/af67f6c2/blur-store/src/main/java/org/apache/blur/store/hdfs/FSInputFileHandle.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/FSInputFileHandle.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/FSInputFileHandle.java
index 36a5b64..3835eea 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/FSInputFileHandle.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/FSInputFileHandle.java
@@ -21,9 +21,13 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
 import org.apache.blur.memory.MemoryLeakDetector;
+import org.apache.blur.store.hdfs_v2.HdfsUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,6 +35,8 @@ import org.apache.hadoop.fs.Path;
 
 public class FSInputFileHandle implements Closeable {
 
+  private static final Log LOG = LogFactory.getLog(FSInputFileHandle.class);
+
   private final FileSystem _fileSystem;
   private final Path _path;
   private final Map<String, ManagedFSDataInputSequentialAccess> _seqAccessInputs;
@@ -39,18 +45,52 @@ public class FSInputFileHandle implements Closeable {
   private final boolean _resourceTracking;
   private final String _name;
 
-  public FSInputFileHandle(FileSystem fileSystem, Path path, long length, String name, boolean
resourceTracking)
-      throws IOException {
+  public FSInputFileHandle(FileSystem fileSystem, Path path, long expectedLength, String
name,
+      boolean resourceTracking, boolean checkFileLength) throws IOException {
     _resourceTracking = resourceTracking;
     _fileSystem = fileSystem;
     _path = path;
     _name = name;
     _seqAccessInputs = new ConcurrentHashMap<String, ManagedFSDataInputSequentialAccess>();
-    FSDataInputStream inputStream = _fileSystem.open(_path);
-    _randomAccess = new ManagedFSDataInputRandomAccess(inputStream, _path, length);
+    FSDataInputStream inputStream;
+    if (checkFileLength) {
+      final long start = System.nanoTime();
+      while (true) {
+        inputStream = _fileSystem.open(_path);
+        long actualLength = getFileLength(inputStream);
+        if (actualLength < expectedLength) {
+          inputStream.close();
+          if (start + TimeUnit.SECONDS.toNanos(30) < System.nanoTime()) {
+            throw new IOException("File path [" + path
+                + "] has taken too long to update file length in namenode, expected [" +
expectedLength + "] actual ["
+                + actualLength + "].");
+          }
+          LOG.info("Input stream length is incorrect for file [{0}], expected [{1}] actual
[{2}]", path,
+              expectedLength, actualLength);
+          try {
+            Thread.sleep(250);
+          } catch (InterruptedException e) {
+            throw new IOException(e);
+          }
+        } else if (actualLength == expectedLength) {
+          // We are good !
+          break;
+        } else {
+          throw new IOException("File length expected [" + expectedLength + "] is less than
the actual file length ["
+              + actualLength + "] for file [" + path + "]");
+        }
+      }
+    } else {
+      inputStream = _fileSystem.open(_path);
+    }
+    _randomAccess = new ManagedFSDataInputRandomAccess(inputStream, _path, expectedLength);
     trackObject(inputStream, "Random Inputstream", name, path);
   }
 
+  private long getFileLength(FSDataInputStream inputStream) throws IOException {
+    return HdfsUtils.getDFSLength(inputStream);
+  }
+
   public FSDataInputSequentialAccess openForSequentialInput() throws IOException {
     ensureOpen();
     FSDataInputStream inputStream = _fileSystem.open(_path);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/af67f6c2/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 6240660..6bcba98 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -225,7 +225,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       if (_fileSystem.rename(_newManifestTmp, _newManifest)) {
         _fileSystem.delete(_manifest, false);
         if (_fileSystem.rename(_newManifest, _manifest)) {
-          LOG.info("Manifest sync complete for [{0}]", _manifest);
+          LOG.debug("Manifest sync complete for [{0}]", _manifest);
         } else {
           throw new IOException("Could not rename [" + _newManifest + "] to [" + _manifest
+ "]");
         }
@@ -478,7 +478,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
     long fileLength = fileLength(name);
     Path path = getPath(name);
-    FSInputFileHandle fsInputFileHandle = new FSInputFileHandle(_fileSystem, path, fileLength,
name, _resourceTracking);
+    FSInputFileHandle fsInputFileHandle = new FSInputFileHandle(_fileSystem, path, fileLength,
name, _resourceTracking, _asyncClosing && _useCache);
     HdfsIndexInput input = new HdfsIndexInput(this, fsInputFileHandle, fileLength, _metricsGroup,
name,
         _sequentialReadControl.clone());
     return input;


Mime
View raw message