incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1236413 - in /incubator/accumulo/trunk: ./ src/core/ src/server/ src/server/src/main/java/org/apache/accumulo/server/iterators/ src/server/src/main/java/org/apache/accumulo/server/master/ src/server/src/main/java/org/apache/accumulo/server...
Date Thu, 26 Jan 2012 22:13:36 GMT
Author: kturner
Date: Thu Jan 26 22:13:35 2012
New Revision: 1236413

URL: http://svn.apache.org/viewvc?rev=1236413&view=rev
Log:
ACCUMULO-334 Made splits copy bulk load flags.  Added metadata iterator to delete inactive
bulk load flags.  Made bulk RW test check all markers are present only once. (merged from
1.4)

Added:
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/iterators/
      - copied from r1236412, incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
      - copied unchanged from r1236412, incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
Modified:
    incubator/accumulo/trunk/   (props changed)
    incubator/accumulo/trunk/src/core/   (props changed)
    incubator/accumulo/trunk/src/server/   (props changed)
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java

Propchange: incubator/accumulo/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 26 22:13:35 2012
@@ -1,3 +1,3 @@
 /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043
 /incubator/accumulo/branches/1.3.5rc:1209938
-/incubator/accumulo/branches/1.4:1201902-1236217
+/incubator/accumulo/branches/1.4:1201902-1236412

Propchange: incubator/accumulo/trunk/src/core/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 26 22:13:35 2012
@@ -1,3 +1,3 @@
-/incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215
 /incubator/accumulo/branches/1.3.5rc/src/core:1209938
-/incubator/accumulo/branches/1.4/src/core:1201902-1236217
+/incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215
+/incubator/accumulo/branches/1.4/src/core:1201902-1236412

Propchange: incubator/accumulo/trunk/src/server/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 26 22:13:35 2012
@@ -1,3 +1,3 @@
-/incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611
 /incubator/accumulo/branches/1.3.5rc/src/server:1209938
-/incubator/accumulo/branches/1.4/src/server:1201902-1236217
+/incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611
+/incubator/accumulo/branches/1.4/src/server:1201902-1236412

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1236413&r1=1236412&r2=1236413&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
Thu Jan 26 22:13:35 2012
@@ -93,6 +93,7 @@ import org.apache.accumulo.server.client
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fate.Fate;
 import org.apache.accumulo.server.fate.TStore.TStatus;
+import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
 import org.apache.accumulo.server.master.CoordinateRecoveryTask.JobComplete;
 import org.apache.accumulo.server.master.CoordinateRecoveryTask.LogFile;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
@@ -328,6 +329,9 @@ public class Master implements LiveTServ
 
         IZooReaderWriter zoo = ZooReaderWriter.getInstance();
         
+        TablePropUtil.setTableProperty(Constants.METADATA_TABLE_ID, Property.TABLE_ITERATOR_PREFIX.getKey()
+ "majc.bulkLoadFilter", "20,"
+            + MetadataBulkLoadFilter.class.getName());
+        
         zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, new byte[0],
NodeExistsPolicy.SKIP);
         zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZHDFS_RESERVATIONS, new
byte[0], NodeExistsPolicy.SKIP);
         zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZNEXT_FILE, new byte[]
{'0'}, NodeExistsPolicy.SKIP);

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1236413&r1=1236412&r2=1236413&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
Thu Jan 26 22:13:35 2012
@@ -3432,9 +3432,14 @@ public class Tablet {
       
       String time = tabletTime.getMetadataValue();
       
+      // it is possible that some of the bulk loading flags will be deleted after being read
below because the bulk load
+      // finishes.... therefore split could propogate load flags for a finished bulk load...
there is a special iterator
+      // on the !METADATA table to clean up this type of garbage
+      Map<String,Long> bulkLoadedFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(),
extent);
+
       MetadataTable.splitTablet(high, extent.getPrevEndRow(), splitRatio, SecurityConstants.getSystemCredentials(),
tabletServer.getLock());
-      MetadataTable.addNewTablet(low, lowDirectory, tabletServer.getTabletSession(), lowDatafileSizes,
SecurityConstants.getSystemCredentials(), time,
-          lastFlushID, lastCompactID, tabletServer.getLock());
+      MetadataTable.addNewTablet(low, lowDirectory, tabletServer.getTabletSession(), lowDatafileSizes,
bulkLoadedFiles,
+          SecurityConstants.getSystemCredentials(), time, lastFlushID, lastCompactID, tabletServer.getLock());
       MetadataTable.finishSplit(high, highDatafileSizes, highDatafilesToRemove, SecurityConstants.getSystemCredentials(),
tabletServer.getLock());
       
       log.log(TLevel.TABLET_HIST, extent + " split " + low + " " + high);

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java?rev=1236413&r1=1236412&r2=1236413&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
Thu Jan 26 22:13:35 2012
@@ -163,9 +163,11 @@ public class SplitRecoveryTest extends F
     m.put(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY, assignment.server.asColumnQualifier(),
assignment.server.asMutationValue());
     writer.update(m);
     
-    if (steps >= 1)
-      MetadataTable.addNewTablet(low, "/lowDir", instance, lowDatafileSizes, SecurityConstants.getSystemCredentials(),
TabletTime.LOGICAL_TIME_ID + "0", -1l,
-          -1l, zl);
+    if (steps >= 1) {
+      Map<String,Long> bulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(),
extent);
+      MetadataTable.addNewTablet(low, "/lowDir", instance, lowDatafileSizes, bulkFiles, SecurityConstants.getSystemCredentials(),
TabletTime.LOGICAL_TIME_ID
+          + "0", -1l, -1l, zl);
+    }
     if (steps >= 2)
       MetadataTable.finishSplit(high, highDatafileSizes, highDatafilesToRemove, SecurityConstants.getSystemCredentials(),
zl);
     

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java?rev=1236413&r1=1236412&r2=1236413&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java
Thu Jan 26 22:13:35 2012
@@ -54,8 +54,10 @@ public class BulkPlusOne extends BulkTes
     }
   }
   public static final Text MARKER_CF = new Text("marker");
-  private static final AtomicLong counter = new AtomicLong();
+  static final AtomicLong counter = new AtomicLong();
   
+  private static final Value ONE = new Value("1".getBytes());
+
   static void bulkLoadLots(Logger log, State state, Value value) throws Exception {
     final Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString());
     final Path fail = new Path(dir.toString() + "_fail");
@@ -74,8 +76,8 @@ public class BulkPlusOne extends BulkTes
     for (Integer row : startRows)
       printRows.add(String.format(FMT, row));
     
-    String markerColumnFamily = Long.toString(counter.incrementAndGet());
-    log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT,
LOTS - 1) + " marker " + markerColumnFamily);
+    String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
+    log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT,
LOTS - 1) + " marker " + markerColumnQualifier);
     
     List<Integer> rows = new ArrayList<Integer>(startRows);
     rows.add(LOTS);
@@ -91,7 +93,7 @@ public class BulkPlusOne extends BulkTes
         for (Column col : COLNAMES) {
           f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), value);
         }
-        f.append(new Key(row, MARKER_CF, new Text(markerColumnFamily)), value);
+        f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE);
       }
       f.close();
     }
@@ -101,7 +103,7 @@ public class BulkPlusOne extends BulkTes
     FileStatus[] failures = fs.listStatus(fail);
     if (failures != null && failures.length > 0)
       throw new Exception("Failures " + Arrays.asList(failures) + " found importing files
from " + dir);
-    log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT,
LOTS - 1) + " marker " + markerColumnFamily);
+    log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT,
LOTS - 1) + " marker " + markerColumnQualifier);
   }
   
   @Override

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java?rev=1236413&r1=1236412&r2=1236413&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
Thu Jan 26 22:13:35 2012
@@ -57,7 +57,7 @@ public class Setup extends Test {
         tableOps.create(getTableName());
         IteratorSetting is = new IteratorSetting(10, org.apache.accumulo.core.iterators.user.SummingCombiner.class);
         SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING);
-        SummingCombiner.setColumns(is, BulkPlusOne.COLNAMES);
+        SummingCombiner.setCombineAllColumns(is, true);
         tableOps.attachIterator(getTableName(), is);
       }
     } catch (TableExistsException ex) {
@@ -65,6 +65,7 @@ public class Setup extends Test {
     }
     state.set("rand", rand);
     state.set("fs", FileSystem.get(CachedConfiguration.getInstance()));
+    BulkPlusOne.counter.set(0l);
     
     BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
     ThreadFactory factory = new ThreadFactory() {

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java?rev=1236413&r1=1236412&r2=1236413&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java
Thu Jan 26 22:13:35 2012
@@ -17,6 +17,7 @@
 package org.apache.accumulo.server.test.randomwalk.bulk;
 
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -24,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -58,6 +60,37 @@ public class Verify extends Test {
         throw new Exception("Bad key at " + entry);
       }
     }
+    
+    scanner.clearColumns();
+    scanner.fetchColumnFamily(BulkPlusOne.MARKER_CF);
+    RowIterator rowIter = new RowIterator(scanner);
+    
+    while (rowIter.hasNext()) {
+      Iterator<Entry<Key,Value>> row = rowIter.next();
+      long prev = 0;
+      Text rowText = null;
+      while (row.hasNext()) {
+        Entry<Key,Value> entry = row.next();
+        
+        if (rowText == null)
+          rowText = entry.getKey().getRow();
+
+        long curr = Long.valueOf(entry.getKey().getColumnQualifier().toString());
+
+        if (curr - 1 != prev)
+          throw new Exception("Bad marker count " + entry.getKey() + " " + entry.getValue()
+ " " + prev);
+        
+        if (!entry.getValue().toString().equals("1"))
+          throw new Exception("Bad marker value " + entry.getKey() + " " + entry.getValue());
+        
+        prev = curr;
+      }
+      
+      if (BulkPlusOne.counter.get() != prev) {
+        throw new Exception("Row " + rowText + " does not have all markers " + BulkPlusOne.counter.get()
+ " " + prev);
+      }
+    }
+
     log.info("Test successful on table " + Setup.getTableName());
     state.getConnector().tableOperations().delete(Setup.getTableName());
   }

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java?rev=1236413&r1=1236412&r2=1236413&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
Thu Jan 26 22:13:35 2012
@@ -46,6 +46,7 @@ import org.apache.accumulo.server.Server
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.constraints.MetadataConstraints;
+import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.security.ZKAuthenticator;
@@ -92,6 +93,7 @@ public class Initialize {
     initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions",
"1");
     initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", "10,"
+ VersioningIterator.class.getName());
     initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions",
"1");
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter",
"20," + MetadataBulkLoadFilter.class.getName());
     initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false");
     initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet",
         String.format("%s,%s", Constants.METADATA_TABLET_COLUMN_FAMILY.toString(), Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY.toString()));

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java?rev=1236413&r1=1236412&r2=1236413&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
Thu Jan 26 22:13:35 2012
@@ -417,8 +417,8 @@ public class MetadataTable extends org.a
     return sizes;
   }
   
-  public static void addNewTablet(KeyExtent extent, String path, TServerInstance location,
Map<String,DataFileValue> datafileSizes, AuthInfo credentials,
-      String time, long lastFlushID, long lastCompactID, ZooLock zooLock) {
+  public static void addNewTablet(KeyExtent extent, String path, TServerInstance location,
Map<String,DataFileValue> datafileSizes,
+      Map<String,Long> bulkLoadedFiles, AuthInfo credentials, String time, long lastFlushID,
long lastCompactID, ZooLock zooLock) {
     Mutation m = extent.getPrevRowUpdateMutation();
     
     ColumnFQ.put(m, Constants.METADATA_DIRECTORY_COLUMN, new Value(path.getBytes()));
@@ -437,6 +437,11 @@ public class MetadataTable extends org.a
       m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(entry.getValue().encode()));
     }
     
+    for (Entry<String,Long> entry : bulkLoadedFiles.entrySet()) {
+      byte[] tidBytes = Long.toString(entry.getValue()).getBytes();
+      m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(tidBytes));
+    }
+
     update(credentials, zooLock, m);
   }
   
@@ -594,7 +599,8 @@ public class MetadataTable extends org.a
     
     if (!scanner2.iterator().hasNext()) {
       log.debug("Prev tablet " + prevRowKey + " does not exist, need to create it " + metadataPrevEndRow
+ " " + prevPrevEndRow + " " + splitRatio);
-      MetadataTable.addNewTablet(low, lowDirectory, tserver, lowDatafileSizes, credentials,
time, initFlushID, initCompactID, lock);
+      Map<String,Long> bulkFiles = getBulkFilesLoaded(credentials, metadataEntry);
+      MetadataTable.addNewTablet(low, lowDirectory, tserver, lowDatafileSizes, bulkFiles,
credentials, time, initFlushID, initCompactID, lock);
     } else {
       log.debug("Prev tablet " + prevRowKey + " exist, do not need to add it");
     }
@@ -1245,6 +1251,26 @@ public class MetadataTable extends org.a
     }
   }
   
+  public static Map<String,Long> getBulkFilesLoaded(AuthInfo credentials, KeyExtent
extent) {
+    return getBulkFilesLoaded(credentials, extent.getMetadataEntry());
+  }
+  
+  public static Map<String,Long> getBulkFilesLoaded(AuthInfo credentials, Text metadataRow)
{
+    
+    Map<String,Long> ret = new HashMap<String,Long>();
+    
+    Scanner scanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID,
Constants.NO_AUTHS);
+    scanner.setRange(new Range(metadataRow));
+    scanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY);
+    for (Entry<Key,Value> entry : scanner) {
+      String file = entry.getKey().getColumnQualifier().toString();
+      Long tid = Long.parseLong(entry.getValue().toString());
+      
+      ret.put(file, tid);
+    }
+    return ret;
+  }
+
   public static void addBulkLoadInProgressFlag(String path) {
     
     Mutation m = new Mutation(Constants.METADATA_BLIP_FLAG_PREFIX + path);



Mime
View raw message