incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/3] git commit: Removing the copy file logic, would be causing too much NN load. Also adding a file collection primary to the constructor for faster hdfs dir startup when using map reduce.
Date Tue, 28 Apr 2015 13:05:45 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master eba5af979 -> 0092462c6


Removing the copy file logic, would be causing too much NN load.  Also adding a file collection
primary to the constructor for faster hdfs dir startup when using map reduce.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/7c308901
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/7c308901
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/7c308901

Branch: refs/heads/master
Commit: 7c308901bab2a410565b698fc10582b5aa79beb1
Parents: eba5af9
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Apr 28 08:28:02 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Apr 28 08:28:02 2015 -0400

----------------------------------------------------------------------
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 199 +++++--------------
 .../store/hdfs/HdfsDirectoryCopyFileTest.java   |  99 ---------
 2 files changed, 45 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7c308901/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 40931bc..948bc30 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -75,7 +75,6 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsSymlin
   private static final Log LOG = LogFactory.getLog(HdfsDirectory.class);
 
   public static final String LNK = ".lnk";
-  public static final String COPY = ".copy";
   public static final String TMP = ".tmp";
 
   private static final String UTF_8 = "UTF-8";
@@ -152,9 +151,19 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
 
   public HdfsDirectory(Configuration configuration, Path path, SequentialReadControl sequentialReadControl)
       throws IOException {
+    this(configuration, path, sequentialReadControl, null);
+  }
+
+  public HdfsDirectory(Configuration configuration, Path path, SequentialReadControl sequentialReadControl,
+      Collection<String> filesToExpose) throws IOException {
+    if (sequentialReadControl == null) {
+      _sequentialReadControl = new SequentialReadControl(new BlurConfiguration());
+    } else {
+      _sequentialReadControl = sequentialReadControl;
+    }
     _fileSystem = path.getFileSystem(configuration);
     _path = _fileSystem.makeQualified(path);
-    _sequentialReadControl = sequentialReadControl;
+
     if (_path.toUri().getScheme().equals(HDFS_SCHEMA)) {
       _asyncClosing = true;
     } else {
@@ -173,40 +182,40 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       _metricsGroup = metricsGroup;
     }
     if (_useCache) {
-      FileStatus[] listStatus = _fileSystem.listStatus(_path);
-      for (FileStatus fileStatus : listStatus) {
-        if (!fileStatus.isDir()) {
-          Path p = fileStatus.getPath();
-          String name = p.getName();
-          long lastMod;
-          long length;
-          String resolvedName;
-          if (name.endsWith(LNK)) {
-            resolvedName = getRealFileName(name);
-            Path resolvedPath = getPath(resolvedName);
-            FileStatus resolvedFileStatus = _fileSystem.getFileStatus(resolvedPath);
-            lastMod = resolvedFileStatus.getModificationTime();
-          } else if (name.endsWith(COPY)) {
-            resolvedName = getRealFileName(name);
-            lastMod = getLastModTimeFromCopyFile(name);
-          } else {
-            resolvedName = name;
-            lastMod = fileStatus.getModificationTime();
-          }
-          length = length(resolvedName);
-          _fileStatusMap.put(resolvedName, new FStat(lastMod, length));
+      if (filesToExpose == null) {
+        FileStatus[] listStatus = _fileSystem.listStatus(_path);
+        for (FileStatus fileStatus : listStatus) {
+          addToCache(fileStatus);
+        }
+      } else {
+        for (String file : filesToExpose) {
+          Path filePath = getPath(file);
+          FileStatus fileStatus = _fileSystem.getFileStatus(filePath);
+          addToCache(fileStatus);
         }
       }
     }
   }
 
-  protected long getLastModTimeFromCopyFile(String name) {
-    int indexOf = name.indexOf('~');
-    int lastIndexOf = name.lastIndexOf(COPY);
-    String lastModPlusTs = name.substring(indexOf + 1, lastIndexOf);
-    int index = lastModPlusTs.indexOf('_');
-    String lastMod = lastModPlusTs.substring(0, index);
-    return Long.parseLong(lastMod);
+  private void addToCache(FileStatus fileStatus) throws IOException {
+    if (!fileStatus.isDir()) {
+      Path p = fileStatus.getPath();
+      String name = p.getName();
+      long lastMod;
+      long length;
+      String resolvedName;
+      if (name.endsWith(LNK)) {
+        resolvedName = getRealFileName(name);
+        Path resolvedPath = getPath(resolvedName);
+        FileStatus resolvedFileStatus = _fileSystem.getFileStatus(resolvedPath);
+        lastMod = resolvedFileStatus.getModificationTime();
+      } else {
+        resolvedName = name;
+        lastMod = fileStatus.getModificationTime();
+      }
+      length = length(resolvedName);
+      _fileStatusMap.put(resolvedName, new FStat(lastMod, length));
+    }
   }
 
   private static TimerTask getSequentialRefClosingQueueTimerTask() {
@@ -245,9 +254,6 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     if (name.endsWith(LNK)) {
       int lastIndexOf = name.lastIndexOf(LNK);
       return name.substring(0, lastIndexOf);
-    } else if (name.endsWith(COPY)) {
-      int lastIndexOf = name.lastIndexOf('~');
-      return name.substring(0, lastIndexOf);
     }
     return name;
   }
@@ -337,37 +343,11 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
   }
 
-  public void runHdfsCopyFile(String name) throws IOException {
-    Path path = getPath(name);
-    FSDataInputStream inputStream = _fileSystem.open(path);
-
-    Path copyOutputTmpPath = getCopyOutputTmpPath(name);
-    FSDataOutputStream outputStream = _fileSystem.create(copyOutputTmpPath);
-    IOUtils.copy(inputStream, outputStream);
-    inputStream.close();
-    outputStream.close();
-
-    Path copyOutputPath = getCopyOutputPath(name);
-    if (_fileSystem.rename(copyOutputTmpPath, copyOutputPath)) {
-      FSDataInputStream newInput = _fileSystem.open(copyOutputPath);
-      FSDataInputRandomAccess oldInput = _inputMap.put(name, toFSDataInputRandomAccess(copyOutputPath,
newInput));
-      oldInput.close();
-    } else {
-      LOG.error("Unknown error while trying to commit copy file [{0}]", copyOutputPath);
-      _fileSystem.delete(copyOutputTmpPath, false);
-    }
-  }
-
   private Path getCopyOutputTmpPath(String name) throws IOException {
     long fileModified = getFileModified(name);
     return new Path(_path, name + "~" + fileModified + "_" + System.currentTimeMillis() +
TMP);
   }
 
-  protected Path getCopyOutputPath(String name) throws IOException {
-    long fileModified = getFileModified(name);
-    return new Path(_path, name + "~" + fileModified + "_" + System.currentTimeMillis() +
COPY);
-  }
-
   @Override
   public IndexInput openInput(String name, IOContext context) throws IOException {
     LOG.debug("openInput [{0}] [{1}] [{2}]", name, context, getPath());
@@ -446,8 +426,6 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
         String name = files[i].getPath().getName();
         if (name.endsWith(LNK)) {
           result.add(getRealFileName(name));
-        } else if (name.endsWith(COPY)) {
-          result.add(getRealFileName(name));
         } else {
           result.add(name);
         }
@@ -555,42 +533,13 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   }
 
   protected Path getPath(String name) throws IOException {
-    if (isCopyFileAvailable(name)) {
-      return getRealFilePathFromCopyFile(name);
-    } else if (isSymlink(name)) {
+    if (isSymlink(name)) {
       return getRealFilePathFromSymlink(name);
     } else {
       return new Path(_path, name);
     }
   }
 
-  protected Path getRealFilePathFromCopyFile(final String name) throws IOException {
-    // need to cache
-    if (_useCache) {
-      Path path = _copyFilePathMap.get(name);
-      if (path != null) {
-        return path;
-      }
-    }
-    Tracer trace = Trace.trace("filesystem - getRealFilePathFromCopyFile", Trace.param("name",
name));
-    try {
-      FileStatus[] listStatus = _fileSystem.listStatus(_path, new PathFilter() {
-        @Override
-        public boolean accept(Path path) {
-          String fileName = path.getName();
-          return fileName.startsWith(name) && fileName.endsWith(COPY);
-        }
-      });
-      Path path = getRealFilePathFromCopyFileList(listStatus);
-      if (_useCache) {
-        _copyFilePathMap.put(name, path);
-      }
-      return path;
-    } finally {
-      trace.done();
-    }
-  }
-
   protected Path getRealFilePathFromCopyFileList(FileStatus[] listStatus) throws IOException
{
     if (listStatus == null || listStatus.length == 0) {
       throw new IOException("Copy file list empty.");
@@ -599,56 +548,6 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     return listStatus[listStatus.length - 1].getPath();
   }
 
-  protected boolean isCopyFileAvailable(final String name) throws IOException {
-    if (_useCache) {
-      Boolean b = _copyFileMap.get(name);
-      if (b != null) {
-        return b;
-      }
-    }
-    Tracer trace = Trace.trace("filesystem - isCopyFileAvailable", Trace.param("name", name));
-    try {
-      FileStatus[] listStatus;
-      int retryCount = 0;
-      while (true) {
-        try {
-          listStatus = _fileSystem.listStatus(_path, new PathFilter() {
-            @Override
-            public boolean accept(Path path) {
-              String fileName = path.getName();
-              return fileName.startsWith(name) && fileName.endsWith(COPY);
-            }
-          });
-          break;
-        } catch (FileNotFoundException e) {
-          // Wait and retry
-          if (retryCount >= 5) {
-            throw e;
-          }
-          LOG.debug("File not found exception can occur while changes are being made to the
file system, retrying.", e);
-          try {
-            Thread.sleep(100 * (retryCount + 1));
-          } catch (InterruptedException ex) {
-            throw e;
-          }
-          retryCount++;
-        }
-      }
-      boolean exists;
-      if (listStatus == null || listStatus.length == 0) {
-        exists = false;
-      } else {
-        exists = true;
-      }
-      if (_useCache) {
-        _copyFileMap.put(name, exists);
-      }
-      return exists;
-    } finally {
-      trace.done();
-    }
-  }
-
   protected Path getPathOrSymlinkForDelete(String name) throws IOException {
     if (isSymlink(name)) {
       return new Path(_path, name + LNK);
@@ -722,19 +621,11 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     Path path = getPath(name);
     Tracer trace = Trace.trace("filesystem - fileModified", Trace.param("path", path));
     try {
-      if (path.getName().endsWith(COPY)) {
-        long lastModTimeFromCopyFile = getLastModTimeFromCopyFile(path.getName());
-        if (_useCache) {
-          _fileStatusMap.put(name, new FStat(lastModTimeFromCopyFile, fileLength(name)));
-        }
-        return lastModTimeFromCopyFile;
-      } else {
-        FileStatus fileStatus = _fileSystem.getFileStatus(path);
-        if (_useCache) {
-          _fileStatusMap.put(name, new FStat(fileStatus));
-        }
-        return fileStatus.getModificationTime();
+      FileStatus fileStatus = _fileSystem.getFileStatus(path);
+      if (_useCache) {
+        _fileStatusMap.put(name, new FStat(fileStatus));
       }
+      return fileStatus.getModificationTime();
     } finally {
       trace.done();
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7c308901/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryCopyFileTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryCopyFileTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryCopyFileTest.java
deleted file mode 100644
index 47f195a..0000000
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryCopyFileTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.store.hdfs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexOutput;
-import org.junit.Before;
-import org.junit.Test;
-
-public class HdfsDirectoryCopyFileTest {
-
-  private Path _base;
-  private Configuration _configuration;
-  private FileSystem _fileSystem;
-
-  @Before
-  public void setup() throws IOException {
-    _base = new Path("./target/tmp/HdfsDirectoryTest");
-    _configuration = new Configuration();
-
-    _fileSystem = _base.getFileSystem(_configuration);
-    _fileSystem.delete(_base, true);
-    _fileSystem.mkdirs(_base);
-  }
-
-//  @Test
-  public void testFileCopyTest() throws IOException, InterruptedException {
-    HdfsDirectory dir1 = new HdfsDirectory(_configuration, new Path(_base, "dir1"));
-    String name = "file1.txt";
-    IndexOutput out = dir1.createOutput(name, IOContext.DEFAULT);
-    out.writeLong(0L);
-    out.close();
-
-    long fileModified1 = dir1.getFileModified(name);
-    long fileLength1 = dir1.fileLength(name);
-    String[] listAll1 = dir1.listAll();
-
-    Thread.sleep(100);
-    dir1.runHdfsCopyFile(name);
-
-    Path path = dir1.getPath();
-    FileSystem fileSystem = path.getFileSystem(_configuration);
-    FileStatus[] listStatus = fileSystem.listStatus(path);
-    for (FileStatus status : listStatus) {
-      System.out.println(status.getPath());
-    }
-
-    {
-      long fileModified2 = dir1.getFileModified(name);
-      long fileLength2 = dir1.fileLength(name);
-      String[] listAll2 = dir1.listAll();
-
-      assertEquals(fileLength1, fileLength2);
-      assertEquals(fileModified1, fileModified2);
-      assertTrue(Arrays.equals(listAll1, listAll2));
-    }
-
-    dir1.close();
-
-    HdfsDirectory dir2 = new HdfsDirectory(_configuration, new Path(_base, "dir1"));
-    {
-      long fileModified2 = dir2.getFileModified(name);
-      long fileLength2 = dir2.fileLength(name);
-      String[] listAll2 = dir2.listAll();
-
-      assertEquals(fileLength1, fileLength2);
-      assertEquals(fileModified1, fileModified2);
-      assertTrue(Arrays.equals(listAll1, listAll2));
-    }
-
-    dir2.close();
-
-  }
-
-}


Mime
View raw message