accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhava...@apache.org
Subject [2/3] git commit: ACCUMULO-2854 Added additional check to prevent NPE
Date Wed, 18 Jun 2014 21:32:56 GMT
ACCUMULO-2854 Added additional check to prevent NPE

Signed-off-by: Bill Havanki <bhavanki@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/079ef51c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/079ef51c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/079ef51c

Branch: refs/heads/master
Commit: 079ef51c7c254f1f7bd7bd4d83ea405ae635b433
Parents: 5c409b0
Author: Jeffrey S. Schwartz <jeff@schwartech.com>
Authored: Thu Jun 12 20:07:54 2014 -0400
Committer: Bill Havanki <bhavanki@cloudera.com>
Committed: Wed Jun 18 17:07:38 2014 -0400

----------------------------------------------------------------------
 .../accumulo/master/tableOps/BulkImport.java    | 150 ++++++++++---------
 1 file changed, 76 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/079ef51c/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index bdc89dd..e42fee6 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@ -106,28 +106,28 @@ import org.apache.thrift.TException;
 
 public class BulkImport extends MasterRepo {
   public static final String FAILURES_TXT = "failures.txt";
-  
+
   private static final long serialVersionUID = 1L;
-  
+
   private static final Logger log = Logger.getLogger(BulkImport.class);
-  
+
   private String tableId;
   private String sourceDir;
   private String errorDir;
   private boolean setTime;
-  
+
   public BulkImport(String tableId, String sourceDir, String errorDir, boolean setTime) {
     this.tableId = tableId;
     this.sourceDir = sourceDir;
     this.errorDir = errorDir;
     this.setTime = setTime;
   }
-  
+
   @Override
   public long isReady(long tid, Master master) throws Exception {
     if (!Utils.getReadLock(tableId, tid).tryLock())
       return 100;
-    
+
     Instance instance = HdfsZooInstance.getInstance();
     Tables.clearCache(instance);
     if (Tables.getTableState(instance, tableId) == TableState.ONLINE) {
@@ -140,18 +140,18 @@ public class BulkImport extends MasterRepo {
       throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT,
TableOperationExceptionType.OFFLINE, null);
     }
   }
-  
+
   @Override
   //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
   @SuppressWarnings("deprecation")
   public Repo<Master> call(long tid, Master master) throws Exception {
     log.debug(" tid " + tid + " sourceDir " + sourceDir);
-    
+
     Utils.getReadLock(tableId, tid).lock();
-    
+
     // check that the error directory exists and is empty
     VolumeManager fs = master.getFileSystem();
-    
+
     Path errorPath = new Path(errorDir);
     FileStatus errorStatus = null;
     try {
@@ -168,9 +168,9 @@ public class BulkImport extends MasterRepo {
     if (fs.listStatus(errorPath).length != 0)
       throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT,
TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir
           + " is not empty");
-    
+
     ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
-    
+
     // move the files into the directory
     try {
       String bulkDir = prepareBulkImport(fs, sourceDir, tableId);
@@ -182,24 +182,26 @@ public class BulkImport extends MasterRepo {
           + ex);
     }
   }
-  
+
   private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
-    
-    String tableDir = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs()).toString();
-    
+    Path tempPath = fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs());
+    if (tempPath == null)
+      throw new IllegalStateException(sourceDir + " is not in a known namespace");
+
+    String tableDir = tempPath.toString();
     if (tableDir == null)
       throw new IllegalStateException(sourceDir + " is not in a known namespace");
     Path directory = new Path(tableDir + "/" + tableId);
     fs.mkdirs(directory);
-    
+
     // only one should be able to create the lock file
     // the purpose of the lock file is to avoid a race
     // condition between the call to fs.exists() and
     // fs.mkdirs()... if only hadoop had a mkdir() function
     // that failed when the dir existed
-    
+
     UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
-    
+
     while (true) {
       Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
       if (fs.exists(newBulkDir)) // sanity check
@@ -207,7 +209,7 @@ public class BulkImport extends MasterRepo {
       if (fs.mkdirs(newBulkDir))
         return newBulkDir;
       log.warn("Failed to create " + newBulkDir + " for unknown reason");
-      
+
       UtilWaitThread.sleep(3000);
     }
   }
@@ -216,20 +218,20 @@ public class BulkImport extends MasterRepo {
   @SuppressWarnings("deprecation")
   private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException
{
     Path bulkDir = createNewBulkDir(fs, tableId);
-    
+
     MetadataTableUtil.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/"
+ bulkDir.getName());
-    
+
     Path dirPath = new Path(dir);
     FileStatus[] mapFiles = fs.listStatus(dirPath);
-    
+
     UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
-    
+
     for (FileStatus fileStatus : mapFiles) {
       String sa[] = fileStatus.getPath().getName().split("\\.");
       String extension = "";
       if (sa.length > 1) {
         extension = sa[sa.length - 1];
-        
+
         if (!FileOperations.getValidExtensions().contains(extension)) {
           log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring");
           continue;
@@ -238,13 +240,13 @@ public class BulkImport extends MasterRepo {
         // assume it is a map file
         extension = Constants.MAPFILE_EXTENSION;
       }
-      
+
       if (extension.equals(Constants.MAPFILE_EXTENSION)) {
         if (!fileStatus.isDir()) {
           log.warn(fileStatus.getPath() + " is not a map file, ignoring");
           continue;
         }
-        
+
         if (fileStatus.getPath().getName().equals("_logs")) {
           log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce
task, skipping");
           continue;
@@ -260,7 +262,7 @@ public class BulkImport extends MasterRepo {
           continue;
         }
       }
-      
+
       String newName = "I" + namer.getNextName() + "." + extension;
       Path newPath = new Path(bulkDir, newName);
       try {
@@ -272,7 +274,7 @@ public class BulkImport extends MasterRepo {
     }
     return bulkDir.toString();
   }
-  
+
   @Override
   public void undo(long tid, Master environment) throws Exception {
     // unreserve source/error directories
@@ -283,23 +285,23 @@ public class BulkImport extends MasterRepo {
 }
 
 class CleanUpBulkImport extends MasterRepo {
-  
+
   private static final long serialVersionUID = 1L;
-  
+
   private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
-  
+
   private String tableId;
   private String source;
   private String bulk;
   private String error;
-  
+
   public CleanUpBulkImport(String tableId, String source, String bulk, String error) {
     this.tableId = tableId;
     this.source = source;
     this.bulk = bulk;
     this.error = error;
   }
-  
+
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     log.debug("removing the bulk processing flag file in " + bulk);
@@ -320,21 +322,21 @@ class CleanUpBulkImport extends MasterRepo {
 }
 
 class CompleteBulkImport extends MasterRepo {
-  
+
   private static final long serialVersionUID = 1L;
-  
+
   private String tableId;
   private String source;
   private String bulk;
   private String error;
-  
+
   public CompleteBulkImport(String tableId, String source, String bulk, String error) {
     this.tableId = tableId;
     this.source = source;
     this.bulk = bulk;
     this.error = error;
   }
-  
+
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
@@ -343,21 +345,21 @@ class CompleteBulkImport extends MasterRepo {
 }
 
 class CopyFailed extends MasterRepo {
-  
+
   private static final long serialVersionUID = 1L;
-  
+
   private String tableId;
   private String source;
   private String bulk;
   private String error;
-  
+
   public CopyFailed(String tableId, String source, String bulk, String error) {
     this.tableId = tableId;
     this.source = source;
     this.bulk = bulk;
     this.error = error;
   }
-  
+
   @Override
   public long isReady(long tid, Master master) throws Exception {
     Set<TServerInstance> finished = new HashSet<TServerInstance>();
@@ -375,19 +377,19 @@ class CopyFailed extends MasterRepo {
       return 0;
     return 500;
   }
-  
+
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     // This needs to execute after the arbiter is stopped
-    
+
     VolumeManager fs = master.getFileSystem();
-    
+
     if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
       return new CleanUpBulkImport(tableId, source, bulk, error);
-    
+
     HashMap<String,String> failures = new HashMap<String,String>();
     HashMap<String,String> loadedFailures = new HashMap<String,String>();
-    
+
     FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT));
     BufferedReader in = new BufferedReader(new InputStreamReader(failFile, Constants.UTF8));
     try {
@@ -400,18 +402,18 @@ class CopyFailed extends MasterRepo {
     } finally {
       failFile.close();
     }
-    
+
     /*
      * I thought I could move files that have no file references in the table. However its
possible a clone references a file. Therefore only move files that
      * have no loaded markers.
      */
-    
+
     // determine which failed files were loaded
     Connector conn = master.getConnector();
     Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
     mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
     mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
-    
+
     for (Entry<Key,Value> entry : mscanner) {
       if (Long.parseLong(entry.getValue().toString()) == tid) {
         String loadedFile = entry.getKey().getColumnQualifier().toString();
@@ -421,7 +423,7 @@ class CopyFailed extends MasterRepo {
         }
       }
     }
-    
+
     // move failed files that were not loaded
     for (String failure : failures.values()) {
       Path orig = new Path(failure);
@@ -429,47 +431,47 @@ class CopyFailed extends MasterRepo {
       fs.rename(orig, dest);
       log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
     }
-    
+
     if (loadedFailures.size() > 0) {
       DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/"
+ HdfsZooInstance.getInstance().getInstanceID()
           + Constants.ZBULK_FAILED_COPYQ);
-      
+
       HashSet<String> workIds = new HashSet<String>();
-      
+
       for (String failure : loadedFailures.values()) {
         Path orig = new Path(failure);
         Path dest = new Path(error, orig.getName());
-        
+
         if (fs.exists(dest))
           continue;
-        
+
         bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(Constants.UTF8));
         workIds.add(orig.getName());
         log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
       }
-      
+
       bifCopyQueue.waitUntilDone(workIds);
     }
-    
+
     fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
     return new CleanUpBulkImport(tableId, source, bulk, error);
   }
-  
+
 }
 
 class LoadFiles extends MasterRepo {
-  
+
   private static final long serialVersionUID = 1L;
-  
+
   private static ExecutorService threadPool = null;
   private static final Logger log = Logger.getLogger(BulkImport.class);
-  
+
   private String tableId;
   private String source;
   private String bulk;
   private String errorDir;
   private boolean setTime;
-  
+
   public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime)
{
     this.tableId = tableId;
     this.source = source;
@@ -477,7 +479,7 @@ class LoadFiles extends MasterRepo {
     this.errorDir = errorDir;
     this.setTime = setTime;
   }
-  
+
   @Override
   public long isReady(long tid, Master master) throws Exception {
     if (master.onlineTabletServers().size() == 0)
@@ -505,7 +507,7 @@ class LoadFiles extends MasterRepo {
       files.add(entry);
     }
     log.debug("tid " + tid + " importing " + files.size() + " files");
-    
+
     Path writable = new Path(this.errorDir, ".iswritable");
     if (!fs.createNewFile(writable)) {
       // Maybe this is a re-try... clear the flag and try again
@@ -515,22 +517,22 @@ class LoadFiles extends MasterRepo {
             "Unable to write to " + this.errorDir);
     }
     fs.delete(writable);
-    
+
     final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
     for (FileStatus f : files)
       filesToLoad.add(f.getPath().toString());
-    
+
     final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
     for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++)
{
       List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
-      
+
       if (master.onlineTabletServers().size() == 0)
         log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid
+ ")");
-      
+
       while (master.onlineTabletServers().size() == 0) {
         UtilWaitThread.sleep(500);
       }
-      
+
       // Use the threadpool to assign files one-at-a-time to the server
       final List<String> loaded = Collections.synchronizedList(new ArrayList<String>());
       for (final String file : filesToLoad) {
@@ -575,7 +577,7 @@ class LoadFiles extends MasterRepo {
         UtilWaitThread.sleep(100);
       }
     }
-    
+
     FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT),
true);
     BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, Constants.UTF8));
     try {
@@ -586,11 +588,11 @@ class LoadFiles extends MasterRepo {
     } finally {
       out.close();
     }
-    
+
     // return the next step, which will perform cleanup
     return new CompleteBulkImport(tableId, source, bulk, errorDir);
   }
-  
+
   static String sampleList(Collection<?> potentiallyLongList, int max) {
     StringBuffer result = new StringBuffer();
     result.append("[");
@@ -610,5 +612,5 @@ class LoadFiles extends MasterRepo {
     result.append("]");
     return result.toString();
   }
-  
+
 }


Mime
View raw message