hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1471250 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Date Wed, 24 Apr 2013 04:49:47 GMT
Author: stack
Date: Wed Apr 24 04:49:47 2013
New Revision: 1471250

URL: http://svn.apache.org/r1471250
Log:
HBASE-5472 LoadIncrementalHFiles loops forever if the target table misses a CF -- REAPPLY

Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1471250&r1=1471249&r2=1471250&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
Wed Apr 24 04:49:47 2013
@@ -187,7 +187,7 @@ public class LoadIncrementalHFiles exten
   /**
    * Perform a bulk load of the given directory into the given
    * pre-existing table.  This method is not threadsafe.
-   * 
+   *
    * @param hfofDir the directory that was provided as the output path
    * of a job using HFileOutputFormat
    * @param table the table to load into
@@ -220,6 +220,27 @@ public class LoadIncrementalHFiles exten
     Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
     try {
       discoverLoadQueue(queue, hfofDir);
+      // check whether there is invalid family name in HFiles to be bulkloaded
+      Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
+      ArrayList<String> familyNames = new ArrayList<String>();
+      for (HColumnDescriptor family : families) {
+        familyNames.add(family.getNameAsString());
+      }
+      ArrayList<String> unmatchedFamilies = new ArrayList<String>();
+      for (LoadQueueItem lqi : queue) {
+        String familyNameInHFile = Bytes.toString(lqi.family);
+        if (!familyNames.contains(familyNameInHFile)) {
+          unmatchedFamilies.add(familyNameInHFile);
+        }
+      }
+      if (unmatchedFamilies.size() > 0) {
+        String msg =
+            "Unmatched family names found: unmatched family names in HFiles to be bulkloaded:
"
+                + unmatchedFamilies + "; valid family names of table "
+                + Bytes.toString(table.getTableName()) + " are: " + familyNames;
+        LOG.error(msg);
+        throw new IOException(msg);
+      }
       int count = 0;
 
       if (queue.isEmpty()) {
@@ -358,7 +379,7 @@ public class LoadIncrementalHFiles exten
     Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
     while (!queue.isEmpty()) {
       final LoadQueueItem item = queue.remove();
-      
+
       final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>()
{
         public List<LoadQueueItem> call() throws Exception {
           List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
@@ -492,12 +513,12 @@ public class LoadIncrementalHFiles exten
    * Attempts to do an atomic load of many hfiles into a region.  If it fails,
    * it returns a list of hfiles that need to be retried.  If it is successful
    * it will return an empty list.
-   * 
+   *
    * NOTE: To maintain row atomicity guarantees, region server callable should
    * succeed atomically and fails atomically.
-   * 
+   *
    * Protected for testing.
-   * 
+   *
    * @return empty list if success, list of items to retry on recoverable
    * failure
    */
@@ -650,7 +671,7 @@ public class LoadIncrementalHFiles exten
   private boolean doesTableExist(String tableName) throws Exception {
     return hbAdmin.tableExists(tableName);
   }
-  
+
   /*
    * Infers region boundaries for a new table.
    * Parameter:
@@ -658,29 +679,29 @@ public class LoadIncrementalHFiles exten
    *     If a key is a start key of a file, then it maps to +1
    *     If a key is an end key of a file, then it maps to -1
    * Algo:
-   * 1) Poll on the keys in order: 
-   *    a) Keep adding the mapped values to these keys (runningSum) 
+   * 1) Poll on the keys in order:
+   *    a) Keep adding the mapped values to these keys (runningSum)
    *    b) Each time runningSum reaches 0, add the start Key from when the runningSum had
started to a boundary list.
-   * 2) Return the boundary list. 
+   * 2) Return the boundary list.
    */
   public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
     ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
     int runningValue = 0;
     byte[] currStartKey = null;
     boolean firstBoundary = true;
-    
+
     for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
       if (runningValue == 0) currStartKey = item.getKey();
       runningValue += item.getValue();
       if (runningValue == 0) {
         if (!firstBoundary) keysArray.add(currStartKey);
         firstBoundary = false;
-      } 
+      }
     }
-    
+
     return keysArray.toArray(new byte[0][0]);
   }
- 
+
   /*
    * If the table is created for the first time, then "completebulkload" reads the files
twice.
    * More modifications necessary if we want to avoid doing it.
@@ -706,7 +727,7 @@ public class LoadIncrementalHFiles exten
     // Build a set of keys
     byte[][] keys;
     TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-    
+
     for (FileStatus stat : familyDirStatuses) {
       if (!stat.isDir()) {
         LOG.warn("Skipping non-directory " + stat.getPath());
@@ -716,10 +737,10 @@ public class LoadIncrementalHFiles exten
       // Skip _logs, etc
       if (familyDir.getName().startsWith("_")) continue;
       byte[] family = familyDir.getName().getBytes();
-     
+
       hcd = new HColumnDescriptor(family);
       htd.addFamily(hcd);
-      
+
       Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
       for (Path hfile : hfiles) {
         if (hfile.getName().startsWith("_")) continue;
@@ -739,7 +760,7 @@ public class LoadIncrementalHFiles exten
           LOG.info("Trying to figure out region boundaries hfile=" + hfile +
             " first=" + Bytes.toStringBinary(first) +
             " last="  + Bytes.toStringBinary(last));
-          
+
           // To eventually infer start key-end key boundaries
           Integer value = map.containsKey(first)? map.get(first):0;
           map.put(first, value+1);
@@ -751,7 +772,7 @@ public class LoadIncrementalHFiles exten
         }
       }
     }
-    
+
     keys = LoadIncrementalHFiles.inferBoundaries(map);
     this.hbAdmin.createTable(htd,keys);
 

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1471250&r1=1471249&r2=1471250&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Wed Apr 24 04:49:47 2013
@@ -53,6 +53,7 @@ import org.junit.experimental.categories
 public class TestLoadIncrementalHFiles {
   private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
   private static final byte[] FAMILY = Bytes.toBytes("myfam");
+  private static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names
found";
 
   private static final byte[][] SPLIT_KEYS = new byte[][] {
     Bytes.toBytes("ddd"),
@@ -188,6 +189,11 @@ public class TestLoadIncrementalHFiles {
 
     HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
     HTableDescriptor htd = new HTableDescriptor(TABLE);
+    // set real family name to upper case in purpose to simulate the case that
+    // family name in HFiles is invalid
+    HColumnDescriptor family =
+        new HColumnDescriptor(Bytes.toBytes(new String(FAMILY).toUpperCase()));
+    htd.addFamily(family);
     admin.createTable(htd, SPLIT_KEYS);
 
     HTable table = new HTable(util.getConfiguration(), TABLE);
@@ -198,6 +204,11 @@ public class TestLoadIncrementalHFiles {
       assertTrue("Loading into table with non-existent family should have failed", false);
     } catch (Exception e) {
       assertTrue("IOException expected", e instanceof IOException);
+      // further check whether the exception message is correct
+      String errMsg = e.getMessage();
+      assertTrue("Incorrect exception message, expected message: ["
+          + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + "], current message: [" + errMsg + "]",
+          errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
     }
     table.close();
     admin.close();



Mime
View raw message