incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding another stage to allowing for recopying of hdfs files to improve local reads after mr imports or shard failover event.
Date Tue, 07 Apr 2015 14:14:09 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 96592cc32 -> 5657f1b09


Adding another stage to allowing for recopying of hdfs files to improve local reads after
mr imports or shard failover event.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/5657f1b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/5657f1b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/5657f1b0

Branch: refs/heads/master
Commit: 5657f1b09d3733aff390917a3765a9f47f94526a
Parents: 96592cc
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Apr 7 10:14:31 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Apr 7 10:14:31 2015 -0400

----------------------------------------------------------------------
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 213 ++++++++++++++++---
 .../store/hdfs/HdfsDirectoryCopyFileTest.java   |  99 +++++++++
 2 files changed, 277 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5657f1b0/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index f663251..90eff3f 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -26,13 +26,16 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.lang.ref.WeakReference;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.TreeSet;
 import java.util.WeakHashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -70,6 +73,8 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsSymlin
   private static final Log LOG = LogFactory.getLog(HdfsDirectory.class);
 
   public static final String LNK = ".lnk";
+  public static final String COPY = ".copy";
+  public static final String TMP = ".tmp";
 
   private static final String UTF_8 = "UTF-8";
   private static final String HDFS_SCHEMA = "hdfs";
@@ -132,8 +137,10 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   protected final Map<String, FStat> _fileStatusMap = new ConcurrentHashMap<String,
FStat>();
   protected final Map<String, Boolean> _symlinkMap = new ConcurrentHashMap<String,
Boolean>();
   protected final Map<String, Path> _symlinkPathMap = new ConcurrentHashMap<String,
Path>();
-  protected final Map<Path, FSDataInputRandomAccess> _inputMap = new ConcurrentHashMap<Path,
FSDataInputRandomAccess>();
-  protected final boolean _useCache = true;
+  protected final Map<String, Boolean> _copyFileMap = new ConcurrentHashMap<String,
Boolean>();
+  protected final Map<String, Path> _copyFilePathMap = new ConcurrentHashMap<String,
Path>();
+  protected final Map<String, FSDataInputRandomAccess> _inputMap = new ConcurrentHashMap<String,
FSDataInputRandomAccess>();
+  protected final boolean _useCache = false;
   protected final boolean _asyncClosing;
   protected final SequentialReadControl _sequentialReadControl;
 
@@ -174,6 +181,10 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
             Path resolvedPath = getPath(resolvedName);
             FileStatus resolvedFileStatus = _fileSystem.getFileStatus(resolvedPath);
             _fileStatusMap.put(resolvedName, new FStat(resolvedFileStatus));
+          } else if (name.endsWith(COPY)) {
+            String resolvedName = getRealFileName(name);
+            long lastModTime = getLastModTimeFromCopyFile(name);
+            _fileStatusMap.put(resolvedName, new FStat(lastModTime, fileStatus.getLen()));
           } else {
             _fileStatusMap.put(name, new FStat(fileStatus));
           }
@@ -182,6 +193,15 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
   }
 
+  protected long getLastModTimeFromCopyFile(String name) {
+    int indexOf = name.indexOf('~');
+    int lastIndexOf = name.lastIndexOf(COPY);
+    String lastModPlusTs = name.substring(indexOf + 1, lastIndexOf);
+    int index = lastModPlusTs.indexOf('_');
+    String lastMod = lastModPlusTs.substring(0, index);
+    return Long.parseLong(lastMod);
+  }
+
   private static TimerTask getSequentialRefClosingQueueTimerTask() {
     return new TimerTask() {
       @Override
@@ -218,6 +238,9 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     if (name.endsWith(LNK)) {
       int lastIndexOf = name.lastIndexOf(LNK);
       return name.substring(0, lastIndexOf);
+    } else if (name.endsWith(COPY)) {
+      int lastIndexOf = name.lastIndexOf('~');
+      return name.substring(0, lastIndexOf);
     }
     return name;
   }
@@ -275,6 +298,8 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       public void close() throws IOException {
         super.close();
         _fileStatusMap.put(name, new FStat(System.currentTimeMillis(), outputStream.getPos()));
+        // This exists because HDFS is so slow to close files. There are
+        // built-in sleeps during the close call.
         if (_asyncClosing) {
           outputStream.sync();
           CLOSING_QUEUE.add(outputStream);
@@ -301,6 +326,37 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
   }
 
+  public void runHdfsCopyFile(String name) throws IOException {
+    Path path = getPath(name);
+    FSDataInputStream inputStream = _fileSystem.open(path);
+
+    Path copyOutputTmpPath = getCopyOutputTmpPath(name);
+    FSDataOutputStream outputStream = _fileSystem.create(copyOutputTmpPath);
+    IOUtils.copy(inputStream, outputStream);
+    inputStream.close();
+    outputStream.close();
+
+    Path copyOutputPath = getCopyOutputPath(name);
+    if (_fileSystem.rename(copyOutputTmpPath, copyOutputPath)) {
+      FSDataInputStream newInput = _fileSystem.open(copyOutputPath);
+      FSDataInputRandomAccess oldInput = _inputMap.put(name, toFSDataInputRandomAccess(copyOutputPath,
newInput));
+      oldInput.close();
+    } else {
+      LOG.error("Unknown error while trying to commit copy file [{0}]", copyOutputPath);
+      _fileSystem.delete(copyOutputTmpPath, false);
+    }
+  }
+
+  private Path getCopyOutputTmpPath(String name) throws IOException {
+    long fileModified = getFileModified(name);
+    return new Path(_path, name + "~" + fileModified + "_" + System.currentTimeMillis() +
TMP);
+  }
+
+  protected Path getCopyOutputPath(String name) throws IOException {
+    long fileModified = getFileModified(name);
+    return new Path(_path, name + "~" + fileModified + "_" + System.currentTimeMillis() +
COPY);
+  }
+
   @Override
   public IndexInput openInput(String name, IOContext context) throws IOException {
     LOG.debug("openInput [{0}] [{1}] [{2}]", name, context, getPath());
@@ -323,31 +379,36 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     Tracer trace = Trace.trace("filesystem - open", Trace.param("path", path));
     try {
       final FSDataInputStream inputStream = _fileSystem.open(path);
-      FSDataInputRandomAccess randomInputStream = new FSDataInputRandomAccess() {
-
-        @Override
-        public void close() throws IOException {
-          inputStream.close();
-        }
-
-        @Override
-        public int read(long filePointer, byte[] b, int offset, int length) throws IOException
{
-          return inputStream.read(filePointer, b, offset, length);
-        }
-
-        @Override
-        public String toString() {
-          return path.toString();
-        }
-
-      };
-      _inputMap.put(path, randomInputStream);
+      FSDataInputRandomAccess randomInputStream = toFSDataInputRandomAccess(path, inputStream);
+      _inputMap.put(name, randomInputStream);
       return randomInputStream;
     } finally {
       trace.done();
     }
   }
 
+  private FSDataInputRandomAccess toFSDataInputRandomAccess(final Path path, final FSDataInputStream
inputStream) {
+    FSDataInputRandomAccess randomInputStream = new FSDataInputRandomAccess() {
+
+      @Override
+      public void close() throws IOException {
+        inputStream.close();
+      }
+
+      @Override
+      public int read(long filePointer, byte[] b, int offset, int length) throws IOException
{
+        return inputStream.read(filePointer, b, offset, length);
+      }
+
+      @Override
+      public String toString() {
+        return path.toString();
+      }
+
+    };
+    return randomInputStream;
+  }
+
   @Override
   public String[] listAll() throws IOException {
     LOG.debug("listAll [{0}]", getPath());
@@ -369,11 +430,18 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
           }
         }
       });
-      String[] result = new String[files.length];
-      for (int i = 0; i < result.length; i++) {
-        result[i] = files[i].getPath().getName();
+      SortedSet<String> result = new TreeSet<String>();
+      for (int i = 0; i < files.length; i++) {
+        String name = files[i].getPath().getName();
+        if (name.endsWith(LNK)) {
+          result.add(getRealFileName(name));
+        } else if (name.endsWith(COPY)) {
+          result.add(getRealFileName(name));
+        } else {
+          result.add(name);
+        }
       }
-      return result;
+      return result.toArray(new String[result.size()]);
     } finally {
       trace.done();
     }
@@ -412,9 +480,8 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   }
 
   protected void delete(String name) throws IOException {
-    Path path = getPath();
-    FSDataInputRandomAccess inputStream = _inputMap.remove(path);
-    Tracer trace = Trace.trace("filesystem - delete", Trace.param("path", path));
+    FSDataInputRandomAccess inputStream = _inputMap.remove(name);
+    Tracer trace = Trace.trace("filesystem - delete", Trace.param("path", getPath(name)));
     if (inputStream != null) {
       IOUtils.closeQuietly(inputStream);
     }
@@ -440,7 +507,6 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       }
       return fStat._length;
     }
-
     return length(name);
   }
 
@@ -469,10 +535,79 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   }
 
   protected Path getPath(String name) throws IOException {
-    if (isSymlink(name)) {
+    if (isCopyFileAvailable(name)) {
+      return getRealFilePathFromCopyFile(name);
+    } else if (isSymlink(name)) {
       return getRealFilePathFromSymlink(name);
+    } else {
+      return new Path(_path, name);
+    }
+  }
+
+  protected Path getRealFilePathFromCopyFile(final String name) throws IOException {
+    // need to cache
+    if (_useCache) {
+      Path path = _copyFilePathMap.get(name);
+      if (path != null) {
+        return path;
+      }
+    }
+    Tracer trace = Trace.trace("filesystem - getRealFilePathFromCopyFile", Trace.param("name",
name));
+    try {
+      FileStatus[] listStatus = _fileSystem.listStatus(_path, new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          String fileName = path.getName();
+          return fileName.startsWith(name) && fileName.endsWith(COPY);
+        }
+      });
+      Path path = getRealFilePathFromCopyFileList(listStatus);
+      if (_useCache) {
+        _copyFilePathMap.put(name, path);
+      }
+      return path;
+    } finally {
+      trace.done();
+    }
+  }
+
+  protected Path getRealFilePathFromCopyFileList(FileStatus[] listStatus) throws IOException
{
+    if (listStatus == null || listStatus.length == 0) {
+      throw new IOException("Copy file list empty.");
+    }
+    Arrays.sort(listStatus);
+    return listStatus[listStatus.length - 1].getPath();
+  }
+
+  protected boolean isCopyFileAvailable(final String name) throws IOException {
+    if (_useCache) {
+      Boolean b = _copyFileMap.get(name);
+      if (b != null) {
+        return b;
+      }
+    }
+    Tracer trace = Trace.trace("filesystem - isCopyFileAvailable", Trace.param("name", name));
+    try {
+      FileStatus[] listStatus = _fileSystem.listStatus(_path, new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          String fileName = path.getName();
+          return fileName.startsWith(name) && fileName.endsWith(COPY);
+        }
+      });
+      boolean exists;
+      if (listStatus == null || listStatus.length == 0) {
+        exists = false;
+      } else {
+        exists = true;
+      }
+      if (_useCache) {
+        _copyFileMap.put(name, exists);
+      }
+      return exists;
+    } finally {
+      trace.done();
     }
-    return new Path(_path, name);
   }
 
   protected Path getPathOrSymlinkForDelete(String name) throws IOException {
@@ -548,11 +683,19 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     Path path = getPath(name);
     Tracer trace = Trace.trace("filesystem - fileModified", Trace.param("path", path));
     try {
-      FileStatus fileStatus = _fileSystem.getFileStatus(path);
-      if (_useCache) {
-        _fileStatusMap.put(name, new FStat(fileStatus));
+      if (path.getName().endsWith(COPY)) {
+        long lastModTimeFromCopyFile = getLastModTimeFromCopyFile(path.getName());
+        if (_useCache) {
+          _fileStatusMap.put(name, new FStat(lastModTimeFromCopyFile, fileLength(name)));
+        }
+        return lastModTimeFromCopyFile;
+      } else {
+        FileStatus fileStatus = _fileSystem.getFileStatus(path);
+        if (_useCache) {
+          _fileStatusMap.put(name, new FStat(fileStatus));
+        }
+        return fileStatus.getModificationTime();
       }
-      return fileStatus.getModificationTime();
     } finally {
       trace.done();
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5657f1b0/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryCopyFileTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryCopyFileTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryCopyFileTest.java
new file mode 100644
index 0000000..33a80dc
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryCopyFileTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HdfsDirectoryCopyFileTest {
+
+  private Path _base;
+  private Configuration _configuration;
+  private FileSystem _fileSystem;
+
+  @Before
+  public void setup() throws IOException {
+    _base = new Path("./target/tmp/HdfsDirectoryTest");
+    _configuration = new Configuration();
+
+    _fileSystem = _base.getFileSystem(_configuration);
+    _fileSystem.delete(_base, true);
+    _fileSystem.mkdirs(_base);
+  }
+
+  @Test
+  public void testFileCopyTest() throws IOException, InterruptedException {
+    HdfsDirectory dir1 = new HdfsDirectory(_configuration, new Path(_base, "dir1"));
+    String name = "file1.txt";
+    IndexOutput out = dir1.createOutput(name, IOContext.DEFAULT);
+    out.writeLong(0L);
+    out.close();
+
+    long fileModified1 = dir1.getFileModified(name);
+    long fileLength1 = dir1.fileLength(name);
+    String[] listAll1 = dir1.listAll();
+
+    Thread.sleep(100);
+    dir1.runHdfsCopyFile(name);
+
+    Path path = dir1.getPath();
+    FileSystem fileSystem = path.getFileSystem(_configuration);
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus status : listStatus) {
+      System.out.println(status.getPath());
+    }
+
+    {
+      long fileModified2 = dir1.getFileModified(name);
+      long fileLength2 = dir1.fileLength(name);
+      String[] listAll2 = dir1.listAll();
+
+      assertEquals(fileLength1, fileLength2);
+      assertEquals(fileModified1, fileModified2);
+      assertTrue(Arrays.equals(listAll1, listAll2));
+    }
+
+    dir1.close();
+
+    HdfsDirectory dir2 = new HdfsDirectory(_configuration, new Path(_base, "dir1"));
+    {
+      long fileModified2 = dir2.getFileModified(name);
+      long fileLength2 = dir2.fileLength(name);
+      String[] listAll2 = dir2.listAll();
+
+      assertEquals(fileLength1, fileLength2);
+      assertEquals(fileModified1, fileModified2);
+      assertTrue(Arrays.equals(listAll1, listAll2));
+    }
+
+    dir2.close();
+
+  }
+
+}


Mime
View raw message