accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1465687 - /accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Date Mon, 08 Apr 2013 17:24:26 GMT
Author: ecn
Date: Mon Apr  8 17:24:25 2013
New Revision: 1465687

URL: http://svn.apache.org/r1465687
Log:
ACCUMULO-1252 remove loaded files outside the threadpool

Modified:
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1465687&r1=1465686&r2=1465687&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Mon Apr  8 17:24:25 2013
@@ -513,7 +513,7 @@ class LoadFiles extends MasterRepo {
     }
     fs.delete(writable, false);
     
-    final List<String> filesToLoad = Collections.synchronizedList(new ArrayList<String>());
+    final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
     for (FileStatus f : files)
       filesToLoad.add(f.getPath().toString());
     
@@ -529,6 +529,7 @@ class LoadFiles extends MasterRepo {
       }
       
       // Use the threadpool to assign files one-at-a-time to the server
+      final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
       for (final String file : filesToLoad) {
         results.add(threadPool.submit(new Callable<List<String>>() {
           @Override
@@ -548,7 +549,7 @@ class LoadFiles extends MasterRepo {
               log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
               List<String> fail = client.bulkImportFiles(null, SecurityConstants.getSystemCredentials(),
tid, tableId, attempt, errorDir, setTime);
               if (fail.isEmpty()) {
-                filesToLoad.remove(file);
+                loaded.add(file);
               } else {
                 failures.addAll(fail);
               }
@@ -564,6 +565,7 @@ class LoadFiles extends MasterRepo {
       Set<String> failures = new HashSet<String>();
       for (Future<List<String>> f : results)
         failures.addAll(f.get());
+      filesToLoad.removeAll(loaded);
       if (filesToLoad.size() > 0) {
         log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad,
10) + " failed");
         UtilWaitThread.sleep(100);



Mime
View raw message