Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5490B9FF3 for ; Thu, 26 Jan 2012 22:14:03 +0000 (UTC) Received: (qmail 65725 invoked by uid 500); 26 Jan 2012 22:14:03 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 65689 invoked by uid 500); 26 Jan 2012 22:14:03 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 65679 invoked by uid 99); 26 Jan 2012 22:14:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Jan 2012 22:14:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Jan 2012 22:13:58 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9A1CC23888CD; Thu, 26 Jan 2012 22:13:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1236413 - in /incubator/accumulo/trunk: ./ src/core/ src/server/ src/server/src/main/java/org/apache/accumulo/server/iterators/ src/server/src/main/java/org/apache/accumulo/server/master/ src/server/src/main/java/org/apache/accumulo/server... Date: Thu, 26 Jan 2012 22:13:36 -0000 To: accumulo-commits@incubator.apache.org From: kturner@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120126221336.9A1CC23888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kturner Date: Thu Jan 26 22:13:35 2012 New Revision: 1236413 URL: http://svn.apache.org/viewvc?rev=1236413&view=rev Log: ACCUMULO-334 Made splits copy bulk load flags. Added metadata iterator to delete inactive bulk load flags. Made bulk RW test check all markers are present only once. (merged from 1.4) Added: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/iterators/ - copied from r1236412, incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java - copied unchanged from r1236412, incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java Modified: incubator/accumulo/trunk/ (props changed) incubator/accumulo/trunk/src/core/ (props changed) incubator/accumulo/trunk/src/server/ (props changed) incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java Propchange: incubator/accumulo/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Jan 26 22:13:35 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043 /incubator/accumulo/branches/1.3.5rc:1209938 -/incubator/accumulo/branches/1.4:1201902-1236217 +/incubator/accumulo/branches/1.4:1201902-1236412 Propchange: incubator/accumulo/trunk/src/core/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Jan 26 22:13:35 2012 @@ -1,3 +1,3 @@ -/incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215 /incubator/accumulo/branches/1.3.5rc/src/core:1209938 -/incubator/accumulo/branches/1.4/src/core:1201902-1236217 +/incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215 +/incubator/accumulo/branches/1.4/src/core:1201902-1236412 Propchange: incubator/accumulo/trunk/src/server/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Jan 26 22:13:35 2012 @@ -1,3 +1,3 @@ -/incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611 /incubator/accumulo/branches/1.3.5rc/src/server:1209938 -/incubator/accumulo/branches/1.4/src/server:1201902-1236217 +/incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611 +/incubator/accumulo/branches/1.4/src/server:1201902-1236412 Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1236413&r1=1236412&r2=1236413&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java Thu Jan 26 22:13:35 2012 @@ -93,6 +93,7 @@ import org.apache.accumulo.server.client import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fate.Fate; import org.apache.accumulo.server.fate.TStore.TStatus; +import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter; import org.apache.accumulo.server.master.CoordinateRecoveryTask.JobComplete; import org.apache.accumulo.server.master.CoordinateRecoveryTask.LogFile; import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; @@ -328,6 +329,9 @@ public class Master implements LiveTServ IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + TablePropUtil.setTableProperty(Constants.METADATA_TABLE_ID, Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + + MetadataBulkLoadFilter.class.getName()); + zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, new byte[0], NodeExistsPolicy.SKIP); zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZHDFS_RESERVATIONS, new byte[0], NodeExistsPolicy.SKIP); zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZNEXT_FILE, new byte[] {'0'}, NodeExistsPolicy.SKIP); Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1236413&r1=1236412&r2=1236413&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Thu Jan 26 22:13:35 2012 @@ -3432,9 +3432,14 @@ public class Tablet { String time = tabletTime.getMetadataValue(); + // it is possible that some of the bulk loading flags will be deleted after being read below because the bulk load + // finishes.... therefore split could propogate load flags for a finished bulk load... there is a special iterator + // on the !METADATA table to clean up this type of garbage + Map bulkLoadedFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent); + MetadataTable.splitTablet(high, extent.getPrevEndRow(), splitRatio, SecurityConstants.getSystemCredentials(), tabletServer.getLock()); - MetadataTable.addNewTablet(low, lowDirectory, tabletServer.getTabletSession(), lowDatafileSizes, SecurityConstants.getSystemCredentials(), time, - lastFlushID, lastCompactID, tabletServer.getLock()); + MetadataTable.addNewTablet(low, lowDirectory, tabletServer.getTabletSession(), lowDatafileSizes, bulkLoadedFiles, + SecurityConstants.getSystemCredentials(), time, lastFlushID, lastCompactID, tabletServer.getLock()); MetadataTable.finishSplit(high, highDatafileSizes, highDatafilesToRemove, SecurityConstants.getSystemCredentials(), tabletServer.getLock()); log.log(TLevel.TABLET_HIST, extent + " split " + low + " " + high); Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java?rev=1236413&r1=1236412&r2=1236413&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java Thu Jan 26 22:13:35 2012 @@ -163,9 +163,11 @@ public class SplitRecoveryTest extends F m.put(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY, assignment.server.asColumnQualifier(), assignment.server.asMutationValue()); writer.update(m); - if (steps >= 1) - MetadataTable.addNewTablet(low, "/lowDir", instance, lowDatafileSizes, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID + "0", -1l, - -1l, zl); + if (steps >= 1) { + Map bulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent); + MetadataTable.addNewTablet(low, "/lowDir", instance, lowDatafileSizes, bulkFiles, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID + + "0", -1l, -1l, zl); + } if (steps >= 2) MetadataTable.finishSplit(high, highDatafileSizes, highDatafilesToRemove, SecurityConstants.getSystemCredentials(), zl); Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java?rev=1236413&r1=1236412&r2=1236413&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java Thu Jan 26 22:13:35 2012 @@ -54,8 +54,10 @@ public class BulkPlusOne extends BulkTes } } public static final Text MARKER_CF = new Text("marker"); - private static final AtomicLong counter = new AtomicLong(); + static final AtomicLong counter = new AtomicLong(); + private static final Value ONE = new Value("1".getBytes()); + static void bulkLoadLots(Logger log, State state, Value value) throws Exception { final Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString()); final Path fail = new Path(dir.toString() + "_fail"); @@ -74,8 +76,8 @@ public class BulkPlusOne extends BulkTes for (Integer row : startRows) printRows.add(String.format(FMT, row)); - String markerColumnFamily = Long.toString(counter.incrementAndGet()); - log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnFamily); + String markerColumnQualifier = String.format("%07d", counter.incrementAndGet()); + log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier); List rows = new ArrayList(startRows); rows.add(LOTS); @@ -91,7 +93,7 @@ public class BulkPlusOne extends BulkTes for (Column col : COLNAMES) { f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), value); } - f.append(new Key(row, MARKER_CF, new Text(markerColumnFamily)), value); + f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE); } f.close(); } @@ -101,7 +103,7 @@ public class BulkPlusOne extends BulkTes FileStatus[] failures = fs.listStatus(fail); if (failures != null && failures.length > 0) throw new Exception("Failures " + Arrays.asList(failures) + " found importing files from " + dir); - log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnFamily); + log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier); } @Override Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java?rev=1236413&r1=1236412&r2=1236413&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java Thu Jan 26 22:13:35 2012 @@ -57,7 +57,7 @@ public class Setup extends Test { tableOps.create(getTableName()); IteratorSetting is = new IteratorSetting(10, org.apache.accumulo.core.iterators.user.SummingCombiner.class); SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING); - SummingCombiner.setColumns(is, BulkPlusOne.COLNAMES); + SummingCombiner.setCombineAllColumns(is, true); tableOps.attachIterator(getTableName(), is); } } catch (TableExistsException ex) { @@ -65,6 +65,7 @@ public class Setup extends Test { } state.set("rand", rand); state.set("fs", FileSystem.get(CachedConfiguration.getInstance())); + BulkPlusOne.counter.set(0l); BlockingQueue q = new LinkedBlockingQueue(); ThreadFactory factory = new ThreadFactory() { Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java?rev=1236413&r1=1236412&r2=1236413&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java Thu Jan 26 22:13:35 2012 @@ -17,6 +17,7 @@ package org.apache.accumulo.server.test.randomwalk.bulk; import java.util.Arrays; +import java.util.Iterator; import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.ThreadPoolExecutor; @@ -24,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.conf.DefaultConfiguration; @@ -58,6 +60,37 @@ public class Verify extends Test { throw new Exception("Bad key at " + entry); } } + + scanner.clearColumns(); + scanner.fetchColumnFamily(BulkPlusOne.MARKER_CF); + RowIterator rowIter = new RowIterator(scanner); + + while (rowIter.hasNext()) { + Iterator> row = rowIter.next(); + long prev = 0; + Text rowText = null; + while (row.hasNext()) { + Entry entry = row.next(); + + if (rowText == null) + rowText = entry.getKey().getRow(); + + long curr = Long.valueOf(entry.getKey().getColumnQualifier().toString()); + + if (curr - 1 != prev) + throw new Exception("Bad marker count " + entry.getKey() + " " + entry.getValue() + " " + prev); + + if (!entry.getValue().toString().equals("1")) + throw new Exception("Bad marker value " + entry.getKey() + " " + entry.getValue()); + + prev = curr; + } + + if (BulkPlusOne.counter.get() != prev) { + throw new Exception("Row " + rowText + " does not have all markers " + BulkPlusOne.counter.get() + " " + prev); + } + } + log.info("Test successful on table " + Setup.getTableName()); state.getConnector().tableOperations().delete(Setup.getTableName()); } Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java?rev=1236413&r1=1236412&r2=1236413&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java Thu Jan 26 22:13:35 2012 @@ -46,6 +46,7 @@ import org.apache.accumulo.server.Server import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.constraints.MetadataConstraints; +import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter; import org.apache.accumulo.server.master.state.tables.TableManager; import org.apache.accumulo.server.security.SecurityConstants; import org.apache.accumulo.server.security.ZKAuthenticator; @@ -92,6 +93,7 @@ public class Initialize { initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions", "1"); initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", "10," + VersioningIterator.class.getName()); initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions", "1"); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName()); initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false"); initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet", String.format("%s,%s", Constants.METADATA_TABLET_COLUMN_FAMILY.toString(), Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY.toString())); Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java?rev=1236413&r1=1236412&r2=1236413&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java Thu Jan 26 22:13:35 2012 @@ -417,8 +417,8 @@ public class MetadataTable extends org.a return sizes; } - public static void addNewTablet(KeyExtent extent, String path, TServerInstance location, Map datafileSizes, AuthInfo credentials, - String time, long lastFlushID, long lastCompactID, ZooLock zooLock) { + public static void addNewTablet(KeyExtent extent, String path, TServerInstance location, Map datafileSizes, + Map bulkLoadedFiles, AuthInfo credentials, String time, long lastFlushID, long lastCompactID, ZooLock zooLock) { Mutation m = extent.getPrevRowUpdateMutation(); ColumnFQ.put(m, Constants.METADATA_DIRECTORY_COLUMN, new Value(path.getBytes())); @@ -437,6 +437,11 @@ public class MetadataTable extends org.a m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(entry.getValue().encode())); } + for (Entry entry : bulkLoadedFiles.entrySet()) { + byte[] tidBytes = Long.toString(entry.getValue()).getBytes(); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(tidBytes)); + } + update(credentials, zooLock, m); } @@ -594,7 +599,8 @@ public class MetadataTable extends org.a if (!scanner2.iterator().hasNext()) { log.debug("Prev tablet " + prevRowKey + " does not exist, need to create it " + metadataPrevEndRow + " " + prevPrevEndRow + " " + splitRatio); - MetadataTable.addNewTablet(low, lowDirectory, tserver, lowDatafileSizes, credentials, time, initFlushID, initCompactID, lock); + Map bulkFiles = getBulkFilesLoaded(credentials, metadataEntry); + MetadataTable.addNewTablet(low, lowDirectory, tserver, lowDatafileSizes, bulkFiles, credentials, time, initFlushID, initCompactID, lock); } else { log.debug("Prev tablet " + prevRowKey + " exist, do not need to add it"); } @@ -1245,6 +1251,26 @@ public class MetadataTable extends org.a } } + public static Map getBulkFilesLoaded(AuthInfo credentials, KeyExtent extent) { + return getBulkFilesLoaded(credentials, extent.getMetadataEntry()); + } + + public static Map getBulkFilesLoaded(AuthInfo credentials, Text metadataRow) { + + Map ret = new HashMap(); + + Scanner scanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS); + scanner.setRange(new Range(metadataRow)); + scanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY); + for (Entry entry : scanner) { + String file = entry.getKey().getColumnQualifier().toString(); + Long tid = Long.parseLong(entry.getValue().toString()); + + ret.put(file, tid); + } + return ret; + } + public static void addBulkLoadInProgressFlag(String path) { Mutation m = new Mutation(Constants.METADATA_BLIP_FLAG_PREFIX + path);