accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [1/2] git commit: ACCUMULO-3193 use a thread pool to jam more requests into the NN, which will hopefully compete better with the other NN requests
Date Fri, 03 Oct 2014 20:40:21 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master c9eaa7ff1 -> c217a7d60


ACCUMULO-3193 use a thread pool to jam more requests into the NN, which will hopefully compete
better with the other NN requests


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5cbcffbb
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5cbcffbb
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5cbcffbb

Branch: refs/heads/master
Commit: 5cbcffbb4ce08325af6451eb2941ece6c3886598
Parents: 2b689a2
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Fri Oct 3 16:40:10 2014 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Fri Oct 3 16:40:10 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   1 +
 .../accumulo/master/tableOps/BulkImport.java    | 113 +++++++++++--------
 2 files changed, 70 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5cbcffbb/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 19f1961..a879c4a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -173,6 +173,7 @@ public enum Property {
   MASTER_BULK_RETRIES("master.bulk.retries", "3", PropertyType.COUNT, "The number of attempts
to bulk-load a file before giving up."),
   MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The
number of threads to use when coordinating a bulk-import."),
   MASTER_BULK_TIMEOUT("master.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to
wait for a tablet server to process a bulk import request"),
+  MASTER_BULK_RENAME_THREADS("master.bulk.rename.threadpool.size", "20", PropertyType.COUNT,
"The number of threads to use when moving user files to bulk ingest directories under accumulo
control"),
   MASTER_MINTHREADS("master.server.threads.minimum", "20", PropertyType.COUNT, "The minimum
number of threads to use to handle incoming requests."),
   MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The
time between adjustments of the server thread pool."),
   MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5cbcffbb/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 072fda1..b55b315 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
@@ -96,13 +97,13 @@ import org.apache.thrift.TException;
  * that *a* request completed by seeing the flag written into the metadata
  * table, but we won't know if some other rogue thread is still waiting to start
  * a thread and repeat the operation.
- * 
+ *
  * The master can ask the tablet server if it has any requests still running.
  * Except the tablet server might have some thread about to start a request, but
  * before it has made any bookkeeping about the request. To prevent problems
  * like this, an Arbitrator is used. Before starting any new request, the tablet
  * server checks the Arbitrator to see if the request is still valid.
- * 
+ *
  */
 
 public class BulkImport extends MasterRepo {
@@ -174,7 +175,7 @@ public class BulkImport extends MasterRepo {
 
     // move the files into the directory
     try {
-      String bulkDir = prepareBulkImport(fs, sourceDir, tableId);
+      String bulkDir = prepareBulkImport(master, fs, sourceDir, tableId);
       log.debug(" tid " + tid + " bulkDir " + bulkDir);
       return new LoadFiles(tableId, sourceDir, bulkDir, errorDir, setTime);
     } catch (IOException ex) {
@@ -217,60 +218,84 @@ public class BulkImport extends MasterRepo {
 
   //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
   @SuppressWarnings("deprecation")
-  private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException
{
-    Path bulkDir = createNewBulkDir(fs, tableId);
+  private String prepareBulkImport(Master master, final VolumeManager fs, String dir, String
tableId) throws Exception {
+    final Path bulkDir = createNewBulkDir(fs, tableId);
 
     MetadataTableUtil.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/"
+ bulkDir.getName());
 
     Path dirPath = new Path(dir);
     FileStatus[] mapFiles = fs.listStatus(dirPath);
 
-    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+    final UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+
+    int workerCount = master.getConfiguration().getCount(Property.MASTER_BULK_RENAME_THREADS);
+    SimpleThreadPool workers = new SimpleThreadPool(workerCount, "bulk move");
+    List<Future<Exception>> results = new ArrayList<>();
+
+    for (FileStatus file : mapFiles) {
+      final FileStatus fileStatus = file;
+      results.add(workers.submit(new Callable<Exception>() {
+        @Override
+        public Exception call() throws Exception {
+          try {
+            String sa[] = fileStatus.getPath().getName().split("\\.");
+            String extension = "";
+            if (sa.length > 1) {
+              extension = sa[sa.length - 1];
+
+              if (!FileOperations.getValidExtensions().contains(extension)) {
+                log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring");
+                return null;
+              }
+            } else {
+              // assume it is a map file
+              extension = Constants.MAPFILE_EXTENSION;
+            }
 
-    for (FileStatus fileStatus : mapFiles) {
-      String sa[] = fileStatus.getPath().getName().split("\\.");
-      String extension = "";
-      if (sa.length > 1) {
-        extension = sa[sa.length - 1];
+            if (extension.equals(Constants.MAPFILE_EXTENSION)) {
+              if (!fileStatus.isDir()) {
+                log.warn(fileStatus.getPath() + " is not a map file, ignoring");
+                return null;
+              }
 
-        if (!FileOperations.getValidExtensions().contains(extension)) {
-          log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring");
-          continue;
-        }
-      } else {
-        // assume it is a map file
-        extension = Constants.MAPFILE_EXTENSION;
-      }
+              if (fileStatus.getPath().getName().equals("_logs")) {
+                log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce
task, skipping");
+                return null;
+              }
+              try {
+                FileStatus dataStatus = fs.getFileStatus(new Path(fileStatus.getPath(), MapFile.DATA_FILE_NAME));
+                if (dataStatus.isDir()) {
+                  log.warn(fileStatus.getPath() + " is not a map file, ignoring");
+                  return null;
+                }
+              } catch (FileNotFoundException fnfe) {
+                log.warn(fileStatus.getPath() + " is not a map file, ignoring");
+                return null;
+              }
+            }
 
-      if (extension.equals(Constants.MAPFILE_EXTENSION)) {
-        if (!fileStatus.isDir()) {
-          log.warn(fileStatus.getPath() + " is not a map file, ignoring");
-          continue;
-        }
+            String newName = "I" + namer.getNextName() + "." + extension;
+            Path newPath = new Path(bulkDir, newName);
+            try {
+              fs.rename(fileStatus.getPath(), newPath);
+              log.debug("Moved " + fileStatus.getPath() + " to " + newPath);
+            } catch (IOException E1) {
+              log.error("Could not move: " + fileStatus.getPath().toString() + " " + E1.getMessage());
+            }
 
-        if (fileStatus.getPath().getName().equals("_logs")) {
-          log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce
task, skipping");
-          continue;
-        }
-        try {
-          FileStatus dataStatus = fs.getFileStatus(new Path(fileStatus.getPath(), MapFile.DATA_FILE_NAME));
-          if (dataStatus.isDir()) {
-            log.warn(fileStatus.getPath() + " is not a map file, ignoring");
-            continue;
+          } catch (Exception ex) {
+            return ex;
           }
-        } catch (FileNotFoundException fnfe) {
-          log.warn(fileStatus.getPath() + " is not a map file, ignoring");
-          continue;
+          return null;
         }
-      }
+      }));
+    }
+    workers.shutdown();
+    while (!workers.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { }
 
-      String newName = "I" + namer.getNextName() + "." + extension;
-      Path newPath = new Path(bulkDir, newName);
-      try {
-        fs.rename(fileStatus.getPath(), newPath);
-        log.debug("Moved " + fileStatus.getPath() + " to " + newPath);
-      } catch (IOException E1) {
-        log.error("Could not move: " + fileStatus.getPath().toString() + " " + E1.getMessage());
+    for (Future<Exception> ex : results) {
+      if (ex.get() != null) {
+        throw ex.get();
       }
     }
     return bulkDir.toString();


Mime
View raw message