accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch master updated: fixes #472 Enabled bulk imports into offline table (#506)
Date Fri, 15 Jun 2018 14:49:46 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 9127ea7  fixes #472 Enabled bulk imports into offline table (#506)
9127ea7 is described below

commit 9127ea7e156bec015a4fd18178a75ae8a1e84211
Author: Keith Turner <kturner@apache.org>
AuthorDate: Fri Jun 15 10:29:23 2018 -0400

    fixes #472 Enabled bulk imports into offline table (#506)
---
 .../core/client/admin/TableOperations.java         |   3 +
 .../accumulo/core/client/impl/BulkImport.java      |  34 +--
 .../core/client/impl/ConcurrentKeyExtentCache.java | 138 +++++++++++
 .../client/impl/ConcurrentKeyExtentCacheTest.java  | 130 ++++++++++
 .../master/tableOps/bulkVer2/BulkImportMove.java   |   5 +-
 .../master/tableOps/bulkVer2/BulkInfo.java         |   2 +
 .../tableOps/bulkVer2/CleanUpBulkImport.java       |  34 +--
 .../tableOps/bulkVer2/CompleteBulkImport.java      |  13 +-
 .../master/tableOps/bulkVer2/LoadFiles.java        | 246 ++++++++++++++-----
 .../master/tableOps/bulkVer2/PrepBulkImport.java   |  52 ++--
 .../accumulo/test/functional/BulkLoadIT.java       | 261 +++++++++++++--------
 11 files changed, 690 insertions(+), 228 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
index 1c09dcd..894b2fc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
@@ -676,6 +676,9 @@ public interface TableOperations {
    * lock. The old bulk import method ({@link #importDirectory(String, String, String, boolean)})
    * examines files on the server side while holding a table read lock.
    *
+   * <p>
+   * This API supports adding files to online and offline tables.
+   *
    * @since 2.0.0
    */
   default ImportSourceArguments addFilesTo(String tableName) {
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java
index d8decd5..8c3abd5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java
@@ -50,7 +50,6 @@ import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.TableOperations.ImportExecutorOptions;
 import org.apache.accumulo.core.client.admin.TableOperations.ImportSourceArguments;
 import org.apache.accumulo.core.client.admin.TableOperations.ImportSourceOptions;
-import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
@@ -72,7 +71,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Collections2;
 
 public class BulkImport implements ImportSourceArguments, ImportExecutorOptions {
 
@@ -245,10 +243,15 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
     return results;
   }
 
-  public static List<TabletLocation> findOverlappingTablets(ClientContext context,
-      TabletLocator locator, Text startRow, Text endRow, FileSKVIterator reader)
+  public interface KeyExtentCache {
+    KeyExtent lookup(Text row)
+        throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException;
+  }
+
+  public static List<KeyExtent> findOverlappingTablets(ClientContext context,
+      KeyExtentCache extentCache, Text startRow, Text endRow, FileSKVIterator reader)
       throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
-    List<TabletLocation> result = new ArrayList<>();
+    List<KeyExtent> result = new ArrayList<>();
     Collection<ByteSequence> columnFamilies = Collections.emptyList();
     Text row = startRow;
     if (row == null)
@@ -261,10 +264,10 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
         break;
       }
       row = reader.getTopKey().getRow();
-      TabletLocation tabletLocation = locator.locateTablet(context, row, false, true);
+      KeyExtent extent = extentCache.lookup(row);
       // log.debug(filename + " found row " + row + " at location " + tabletLocation);
-      result.add(tabletLocation);
-      row = tabletLocation.tablet_extent.getEndRow();
+      result.add(extent);
+      row = extent.getEndRow();
       if (row != null && (endRow == null || row.compareTo(endRow) < 0)) {
         row = new Text(row);
         row.append(byte0, 0, byte0.length);
@@ -275,19 +278,20 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
     return result;
   }
 
-  public static List<TabletLocation> findOverlappingTablets(ClientContext context,
-      TabletLocator locator, Path file, FileSystem fs)
+  public static List<KeyExtent> findOverlappingTablets(ClientContext context,
+      KeyExtentCache extentCache, Path file, FileSystem fs)
       throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
     try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
         .forFile(file.toString(), fs, fs.getConf())
         .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) {
-      return findOverlappingTablets(context, locator, null, null, reader);
+      return findOverlappingTablets(context, extentCache, null, null, reader);
     }
   }
 
   public static SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs,
       Table.ID tableId, Path dirPath, Executor executor, ClientContext context) throws IOException {
-    TabletLocator locator = TabletLocator.getLocator(context, tableId);
+
+    KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, context);
 
     FileStatus[] files = fs.listStatus(dirPath,
         p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING));
@@ -298,14 +302,12 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
       CompletableFuture<Map<KeyExtent,Bulk.FileInfo>> future = CompletableFuture.supplyAsync(() -> {
         try {
           long t1 = System.currentTimeMillis();
-          List<TabletLocation> locations = findOverlappingTablets(context, locator,
+          List<KeyExtent> extents = findOverlappingTablets(context, extentCache,
               fileStatus.getPath(), fs);
-          Collection<KeyExtent> extents = Collections2.transform(locations, l -> l.tablet_extent);
           Map<KeyExtent,Long> estSizes = estimateSizes(context.getConfiguration(),
               fileStatus.getPath(), fileStatus.getLen(), extents, fs);
           Map<KeyExtent,Bulk.FileInfo> pathLocations = new HashMap<>();
-          for (TabletLocation location : locations) {
-            KeyExtent ke = location.tablet_extent;
+          for (KeyExtent ke : extents) {
             pathLocations.put(ke,
                 new Bulk.FileInfo(fileStatus.getPath(), estSizes.getOrDefault(ke, 0L)));
           }
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCache.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCache.java
new file mode 100644
index 0000000..71dedee
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCache.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.BulkImport.KeyExtentCache;
+import org.apache.accumulo.core.client.impl.Table.ID;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.MetadataScanner;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@VisibleForTesting
+class ConcurrentKeyExtentCache implements KeyExtentCache {
+
+  private static final Text MAX = new Text();
+
+  private Set<Text> rowsToLookup = Collections.synchronizedSet(new HashSet<>());
+
+  List<Text> lookupRows = new ArrayList<>();
+
+  private ConcurrentSkipListMap<Text,KeyExtent> extents = new ConcurrentSkipListMap<>((t1, t2) -> {
+    return (t1 == t2) ? 0 : (t1 == MAX ? 1 : (t2 == MAX ? -1 : t1.compareTo(t2)));
+  });
+  private ID tableId;
+  private ClientContext ctx;
+
+  @VisibleForTesting
+  ConcurrentKeyExtentCache(Table.ID tableId, ClientContext ctx) {
+    this.tableId = tableId;
+    this.ctx = ctx;
+  }
+
+  private KeyExtent getFromCache(Text row) {
+    Entry<Text,KeyExtent> entry = extents.ceilingEntry(row);
+    if (entry != null && entry.getValue().contains(row)) {
+      return entry.getValue();
+    }
+
+    return null;
+  }
+
+  private boolean inCache(KeyExtent e) {
+    return Objects.equals(e, extents.get(e.getEndRow() == null ? MAX : e.getEndRow()));
+  }
+
+  @VisibleForTesting
+  protected void updateCache(KeyExtent e) {
+    Text prevRow = e.getPrevEndRow() == null ? new Text() : e.getPrevEndRow();
+    Text endRow = e.getEndRow() == null ? MAX : e.getEndRow();
+    extents.subMap(prevRow, e.getPrevEndRow() == null, endRow, true).clear();
+    extents.put(endRow, e);
+  }
+
+  @VisibleForTesting
+  protected Stream<KeyExtent> lookupExtents(Text row)
+      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    return MetadataScanner.builder().from(ctx).scanMetadataTable().overRange(tableId, row, null)
+        .checkConsistency().fetchPrev().build().stream().limit(100).map(TabletMetadata::getExtent);
+  }
+
+  @Override
+  public KeyExtent lookup(Text row)
+      throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    while (true) {
+      KeyExtent ke = getFromCache(row);
+      if (ke != null)
+        return ke;
+
+      // If a metadata lookup is currently in progress, then multiple threads can queue up their
+      // rows. The next lookup will process all queued. Processing multiple at once can be more
+      // efficient.
+      rowsToLookup.add(row);
+
+      synchronized (this) {
+        // This check is done to avoid processing rowsToLookup when the current thread's row is in
+        // the cache.
+        ke = getFromCache(row);
+        if (ke != null) {
+          rowsToLookup.remove(row);
+          return ke;
+        }
+
+        lookupRows.clear();
+        synchronized (rowsToLookup) {
+          // Gather all rows that were queued for lookup before this point in time.
+          rowsToLookup.forEach(lookupRows::add);
+          rowsToLookup.clear();
+        }
+        // Lookup rows in the metadata table in sorted order. This could possibly lead to less
+        // metadata lookups.
+        lookupRows.sort(Text::compareTo);
+
+        for (Text lookupRow : lookupRows) {
+          if (getFromCache(lookupRow) == null) {
+            Iterator<KeyExtent> iter = lookupExtents(lookupRow).iterator();
+            while (iter.hasNext()) {
+              KeyExtent ke2 = iter.next();
+              if (inCache(ke2))
+                break;
+              updateCache(ke2);
+            }
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCacheTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCacheTest.java
new file mode 100644
index 0000000..45d9456
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCacheTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+public class ConcurrentKeyExtentCacheTest {
+
+  private static List<KeyExtent> extents = new ArrayList<>();
+  private static Set<KeyExtent> extentsSet = new HashSet<>();
+
+  @BeforeClass
+  public static void setupSplits() {
+    Text prev = null;
+    for (int i = 1; i < 256; i++) {
+      Text endRow = new Text(String.format("%02x", i));
+      extents.add(new KeyExtent(Table.ID.of("1"), endRow, prev));
+      prev = endRow;
+    }
+
+    extents.add(new KeyExtent(Table.ID.of("1"), null, prev));
+
+    extentsSet.addAll(extents);
+  }
+
+  private static class TestCache extends ConcurrentKeyExtentCache {
+
+    AtomicInteger updates = new AtomicInteger(0);
+
+    TestCache() {
+      super(null, null);
+    }
+
+    @Override
+    protected void updateCache(KeyExtent e) {
+      super.updateCache(e);
+      updates.incrementAndGet();
+    }
+
+    @Override
+    protected Stream<KeyExtent> lookupExtents(Text row) {
+      int index = -1;
+      for (int i = 0; i < extents.size(); i++) {
+        if (extents.get(i).contains(row)) {
+          index = i;
+          break;
+        }
+      }
+
+      Uninterruptibles.sleepUninterruptibly(3, TimeUnit.MILLISECONDS);
+
+      return extents.subList(index, extents.size()).stream().limit(73);
+    }
+  }
+
+  private void testLookup(TestCache tc, Text lookupRow) {
+    try {
+      KeyExtent extent = tc.lookup(lookupRow);
+      Assert.assertTrue(extent.contains(lookupRow));
+      Assert.assertTrue(extentsSet.contains(extent));
+    } catch (IOException | AccumuloException | AccumuloSecurityException
+        | TableNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testExactEndRows() {
+    Random rand = new Random(42);
+    TestCache tc = new TestCache();
+    rand.ints(10000, 0, 256).mapToObj(i -> new Text(String.format("%02x", i))).sequential()
+        .forEach(lookupRow -> testLookup(tc, lookupRow));
+    Assert.assertEquals(256, tc.updates.get());
+
+    // try parallel
+    TestCache tc2 = new TestCache();
+    rand.ints(10000, 0, 256).mapToObj(i -> new Text(String.format("%02x", i))).parallel()
+        .forEach(lookupRow -> testLookup(tc2, lookupRow));
+    Assert.assertEquals(256, tc2.updates.get());
+  }
+
+  @Test
+  public void testRandom() throws Exception {
+    TestCache tc = new TestCache();
+
+    Random rand = new Random(42);
+    rand.ints(10000).mapToObj(i -> new Text(String.format("%08x", i))).sequential()
+        .forEach(lookupRow -> testLookup(tc, lookupRow));
+    Assert.assertEquals(256, tc.updates.get());
+
+    // try parallel
+    TestCache tc2 = new TestCache();
+    rand.ints(10000).mapToObj(i -> new Text(String.format("%08x", i))).parallel()
+        .forEach(lookupRow -> testLookup(tc2, lookupRow));
+    Assert.assertEquals(256, tc2.updates.get());
+  }
+}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
index 71abea8..c2de4dd 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.impl.BulkSerialize;
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
@@ -75,7 +76,9 @@ class BulkImportMove extends MasterRepo {
 
     VolumeManager fs = master.getFileSystem();
 
-    ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
+    if (bulkInfo.tableState == TableState.ONLINE) {
+      ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
+    }
 
     try {
       Map<String,String> oldToNewNameMap = BulkSerialize.readRenameMap(bulkDir.toString(),
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkInfo.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkInfo.java
index 6c94e20..657f9af 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkInfo.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkInfo.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.master.tableOps.bulkVer2;
 import java.io.Serializable;
 
 import org.apache.accumulo.core.client.impl.Table;
+import org.apache.accumulo.core.master.state.tables.TableState;
 
 /**
  * Package private class to hold all the information used for bulk import2
@@ -30,4 +31,5 @@ class BulkInfo implements Serializable {
   String sourceDir;
   String bulkDir;
   boolean setTime;
+  TableState tableState;
 }
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
index c912f51..3fec43e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
@@ -20,7 +20,7 @@ import java.io.IOException;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.impl.Table;
+import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -37,28 +37,26 @@ public class CleanUpBulkImport extends MasterRepo {
 
   private static final Logger log = LoggerFactory.getLogger(CleanUpBulkImport.class);
 
-  private Table.ID tableId;
-  private String source;
-  private String bulk;
+  private BulkInfo info;
 
-  public CleanUpBulkImport(Table.ID tableId, String source, String bulk) {
-    this.tableId = tableId;
-    this.source = source;
-    this.bulk = bulk;
+  public CleanUpBulkImport(BulkInfo info) {
+    this.info = info;
   }
 
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
-    log.debug("removing the bulkDir processing flag file in " + bulk);
-    Path bulkDir = new Path(bulk);
+    log.debug("removing the bulkDir processing flag file in " + info.bulkDir);
+    Path bulkDir = new Path(info.bulkDir);
     MetadataTableUtil.removeBulkLoadInProgressFlag(master,
         "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
-    MetadataTableUtil.addDeleteEntry(master, tableId, bulkDir.toString());
-    log.debug("removing the metadata table markers for loaded files");
-    Connector conn = master.getConnector();
-    MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
-    Utils.unreserveHdfsDirectory(source, tid);
-    Utils.getReadLock(tableId, tid).unlock();
+    MetadataTableUtil.addDeleteEntry(master, info.tableId, bulkDir.toString());
+    if (info.tableState == TableState.ONLINE) {
+      log.debug("removing the metadata table markers for loaded files");
+      Connector conn = master.getConnector();
+      MetadataTableUtil.removeBulkLoadEntries(conn, info.tableId, tid);
+    }
+    Utils.unreserveHdfsDirectory(info.sourceDir, tid);
+    Utils.getReadLock(info.tableId, tid).unlock();
     // delete json renames and mapping files
     Path renamingFile = new Path(bulkDir, Constants.BULK_RENAME_FILE);
     Path mappingFile = new Path(bulkDir, Constants.BULK_LOAD_MAPPING);
@@ -70,7 +68,9 @@ public class CleanUpBulkImport extends MasterRepo {
     }
 
     log.debug("completing bulkDir import transaction " + tid);
-    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
+    if (info.tableState == TableState.ONLINE) {
+      ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
+    }
     return null;
   }
 }
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CompleteBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CompleteBulkImport.java
index 02a9848..ac7a8a5 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CompleteBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CompleteBulkImport.java
@@ -17,7 +17,6 @@
 package org.apache.accumulo.master.tableOps.bulkVer2;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.impl.Table;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -27,19 +26,15 @@ public class CompleteBulkImport extends MasterRepo {
 
   private static final long serialVersionUID = 1L;
 
-  private Table.ID tableId;
-  private String source;
-  private String bulk;
+  private BulkInfo info;
 
-  public CompleteBulkImport(Table.ID tableId, String source, String bulk) {
-    this.tableId = tableId;
-    this.source = source;
-    this.bulk = bulk;
+  public CompleteBulkImport(BulkInfo info) {
+    this.info = info;
   }
 
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
-    return new CleanUpBulkImport(tableId, source, bulk);
+    return new CleanUpBulkImport(info);
   }
 }
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
index 390a3ea..dcf9721 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
@@ -16,32 +16,40 @@
  */
 package org.apache.accumulo.master.tableOps.bulkVer2;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.impl.Bulk;
 import org.apache.accumulo.core.client.impl.Bulk.Files;
 import org.apache.accumulo.core.client.impl.BulkSerialize;
 import org.apache.accumulo.core.client.impl.BulkSerialize.LoadMappingIterator;
 import org.apache.accumulo.core.client.impl.Table;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.thrift.MapFileInfo;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataScanner;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -52,6 +60,8 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Make asynchronous load calls to each overlapping Tablet. This RepO does its work on the isReady
  * and will return a linear sleep value based on the largest number of Tablets on a TabletServer.
@@ -84,74 +94,58 @@ class LoadFiles extends MasterRepo {
 
   @Override
   public Repo<Master> call(final long tid, final Master master) throws Exception {
-    return new CompleteBulkImport(bulkInfo.tableId, bulkInfo.sourceDir, bulkInfo.bulkDir);
-  }
-
-  static boolean equals(Text t1, Text t2) {
-    if (t1 == null || t2 == null)
-      return t1 == t2;
-
-    return t1.equals(t2);
+    if (bulkInfo.tableState == TableState.ONLINE) {
+      return new CompleteBulkImport(bulkInfo);
+    } else {
+      return new CleanUpBulkImport(bulkInfo);
+    }
   }
 
-  /**
-   * Make asynchronous load calls to each overlapping Tablet in the bulk mapping. Return a sleep
-   * time to isReady based on a factor of the TabletServer with the most Tablets. This method will
-   * scan the metadata table getting Tablet range and location information. It will return 0 when
-   * all files have been loaded.
-   */
-  private long loadFiles(Table.ID tableId, Path bulkDir, LoadMappingIterator lmi, Master master,
-      long tid) throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
-
-    Map.Entry<KeyExtent,Bulk.Files> loadMapEntry = lmi.next();
-
-    Text startRow = loadMapEntry.getKey().getPrevEndRow();
-
-    long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
-    Iterator<TabletMetadata> tabletIter = MetadataScanner.builder().from(master).scanMetadataTable()
-        .overRange(tableId, startRow, null).checkConsistency().fetchPrev().fetchLocation()
-        .fetchLoaded().build().iterator();
+  private static abstract class Loader {
+    protected Path bulkDir;
+    protected Master master;
+    protected long tid;
+    protected boolean setTime;
+
+    void start(Path bulkDir, Master master, long tid, boolean setTime) throws Exception {
+      this.bulkDir = bulkDir;
+      this.master = master;
+      this.tid = tid;
+      this.setTime = setTime;
+    }
 
-    List<TabletMetadata> tablets = new ArrayList<>();
-    TabletMetadata currentTablet = tabletIter.next();
-    HostAndPort server = null;
+    abstract void load(List<TabletMetadata> tablets, Files files) throws Exception;
 
-    // track how many tablets were sent load messages per tablet server
-    MapCounter<HostAndPort> tabletsPerServer = new MapCounter<>();
+    abstract long finish() throws Exception;
+  }
 
-    String fmtTid = String.format("%016x", tid);
+  private static class OnlineLoader extends Loader {
 
+    long timeInMillis;
+    String fmtTid;
     int locationLess = 0;
 
-    long t1 = System.currentTimeMillis();
-    while (true) {
-      if (loadMapEntry == null) {
-        if (!lmi.hasNext()) {
-          break;
-        }
-        loadMapEntry = lmi.next();
-      }
-      KeyExtent fke = loadMapEntry.getKey();
-      Files files = loadMapEntry.getValue();
-      loadMapEntry = null;
+    // track how many tablets were sent load messages per tablet server
+    MapCounter<HostAndPort> loadMsgs;
 
-      tablets.clear();
+    @Override
+    void start(Path bulkDir, Master master, long tid, boolean setTime) throws Exception {
+      super.start(bulkDir, master, tid, setTime);
 
-      while (!equals(currentTablet.getPrevEndRow(), fke.getPrevEndRow())) {
-        currentTablet = tabletIter.next();
-      }
-      tablets.add(currentTablet);
+      timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
+      fmtTid = String.format("%016x", tid);
 
-      while (!equals(currentTablet.getEndRow(), fke.getEndRow())) {
-        currentTablet = tabletIter.next();
-        tablets.add(currentTablet);
-      }
+      loadMsgs = new MapCounter<>();
+    }
 
+    @Override
+    void load(List<TabletMetadata> tablets, Files files) {
       for (TabletMetadata tablet : tablets) {
         // send files to tablet sever
         // ideally there should only be one tablet location to send all the files
 
         TabletMetadata.Location location = tablet.getLocation();
+        HostAndPort server = null;
         if (location == null) {
           locationLess++;
           continue;
@@ -174,14 +168,14 @@ class LoadFiles extends MasterRepo {
         if (thriftImports.size() > 0) {
           // must always increment this even if there is a comms failure, because it indicates there
           // is work to do
-          tabletsPerServer.increment(server, 1);
+          loadMsgs.increment(server, 1);
           log.trace("tid {} asking {} to bulk import {} files", fmtTid, server,
               thriftImports.size());
           TabletClientService.Client client = null;
           try {
             client = ThriftUtil.getTServerClient(server, master, timeInMillis);
-            client.loadFiles(Tracer.traceInfo(), master.rpcCreds(), tid, fke.toThrift(),
-                bulkDir.toString(), thriftImports, bulkInfo.setTime);
+            client.loadFiles(Tracer.traceInfo(), master.rpcCreds(), tid,
+                tablet.getExtent().toThrift(), bulkDir.toString(), thriftImports, setTime);
           } catch (TException ex) {
             log.debug("rpc failed server: " + server + ", tid:" + fmtTid + " " + ex.getMessage(),
                 ex);
@@ -190,20 +184,142 @@ class LoadFiles extends MasterRepo {
           }
         }
       }
+
     }
-    long t2 = System.currentTimeMillis();
 
-    long sleepTime = 0;
-    if (tabletsPerServer.size() > 0) {
-      // find which tablet server had the most load messages sent to it and sleep 13ms for each load
-      // message
-      sleepTime = Collections.max(tabletsPerServer.values()) * 13;
+    @Override
+    long finish() {
+      long sleepTime = 0;
+      if (loadMsgs.size() > 0) {
+        // find which tablet server had the most load messages sent to it and sleep 13ms for each
+        // load message
+        sleepTime = Collections.max(loadMsgs.values()) * 13;
+      }
+
+      if (locationLess > 0) {
+        sleepTime = Math.max(Math.max(100L, locationLess), sleepTime);
+      }
+
+      return sleepTime;
     }
 
-    if (locationLess > 0) {
-      sleepTime = Math.max(100, Math.max(2 * (t2 - t1), sleepTime));
+  }
+
+  private static class OfflineLoader extends Loader {
+
+    BatchWriter bw;
+
+    // track how many tablets were sent load messages per tablet server
+    MapCounter<HostAndPort> unloadingTablets;
+
+    @Override
+    void start(Path bulkDir, Master master, long tid, boolean setTime) throws Exception {
+      Preconditions.checkArgument(!setTime);
+      super.start(bulkDir, master, tid, setTime);
+      bw = master.getConnector().createBatchWriter(MetadataTable.NAME);
+      unloadingTablets = new MapCounter<>();
     }
 
+    @Override
+    void load(List<TabletMetadata> tablets, Files files) throws MutationsRejectedException {
+      byte[] fam = TextUtil.getBytes(DataFileColumnFamily.NAME);
+
+      for (TabletMetadata tablet : tablets) {
+        if (tablet.getLocation() != null) {
+          unloadingTablets.increment(tablet.getLocation().getHostAndPort(), 1L);
+          continue;
+        }
+
+        Mutation mutation = new Mutation(tablet.getExtent().getMetadataEntry());
+
+        for (final Bulk.FileInfo fileInfo : files) {
+          String fullPath = new Path(bulkDir, fileInfo.getFileName()).toString();
+          byte[] val = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries())
+              .encode();
+          mutation.put(fam, fullPath.getBytes(UTF_8), val);
+        }
+
+        bw.addMutation(mutation);
+      }
+    }
+
+    @Override
+    long finish() throws Exception {
+
+      bw.close();
+
+      long sleepTime = 0;
+      if (unloadingTablets.size() > 0) {
+        // find which tablet server had the most tablets to unload and sleep 13ms for each tablet
+        sleepTime = Collections.max(unloadingTablets.values()) * 13;
+      }
+
+      return sleepTime;
+    }
+  }
+
+  /**
+   * Make asynchronous load calls to each overlapping Tablet in the bulk mapping. Return a sleep
+   * time to isReady based on a factor of the TabletServer with the most Tablets. This method will
+   * scan the metadata table getting Tablet range and location information. It will return 0 when
+   * all files have been loaded.
+   */
+  private long loadFiles(Table.ID tableId, Path bulkDir, LoadMappingIterator lmi, Master master,
+      long tid) throws Exception {
+
+    Map.Entry<KeyExtent,Bulk.Files> loadMapEntry = lmi.next();
+
+    Text startRow = loadMapEntry.getKey().getPrevEndRow();
+
+    Iterator<TabletMetadata> tabletIter = MetadataScanner.builder().from(master).scanMetadataTable()
+        .overRange(tableId, startRow, null).checkConsistency().fetchPrev().fetchLocation()
+        .fetchLoaded().build().iterator();
+
+    List<TabletMetadata> tablets = new ArrayList<>();
+    TabletMetadata currentTablet = tabletIter.next();
+
+    Loader loader;
+    if (bulkInfo.tableState == TableState.ONLINE) {
+      loader = new OnlineLoader();
+    } else {
+      loader = new OfflineLoader();
+    }
+
+    loader.start(bulkDir, master, tid, bulkInfo.setTime);
+
+    long t1 = System.currentTimeMillis();
+    while (true) {
+      if (loadMapEntry == null) {
+        if (!lmi.hasNext()) {
+          break;
+        }
+        loadMapEntry = lmi.next();
+      }
+      KeyExtent fke = loadMapEntry.getKey();
+      Files files = loadMapEntry.getValue();
+      loadMapEntry = null;
+
+      tablets.clear();
+
+      while (!Objects.equals(currentTablet.getPrevEndRow(), fke.getPrevEndRow())) {
+        currentTablet = tabletIter.next();
+      }
+      tablets.add(currentTablet);
+
+      while (!Objects.equals(currentTablet.getEndRow(), fke.getEndRow())) {
+        currentTablet = tabletIter.next();
+        tablets.add(currentTablet);
+      }
+
+      loader.load(tablets, files);
+    }
+    long t2 = System.currentTimeMillis();
+
+    long sleepTime = loader.finish();
+    if (sleepTime > 0) {
+      long scanTime = Math.min(t2 - t1, 30000);
+      sleepTime = Math.max(sleepTime, scanTime * 2);
+    }
     return sleepTime;
   }
 }
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
index bca1389..b97426c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
@@ -34,7 +36,6 @@ import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.metadata.schema.MetadataScanner;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.fate.Repo;
@@ -87,16 +88,8 @@ public class PrepBulkImport extends MasterRepo {
     if (master.onlineTabletServers().size() == 0)
       return 500;
     Tables.clearCache(master.getInstance());
-    if (Tables.getTableState(master.getInstance(), bulkInfo.tableId) == TableState.ONLINE) {
-      return Utils.reserveHdfsDirectory(bulkInfo.sourceDir, tid);
-    } else {
-      throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonicalID(), null,
-          TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
-    }
-  }
 
-  static boolean equals(Text t1, Text t2) {
-    return LoadFiles.equals(t1, t2);
+    return Utils.reserveHdfsDirectory(bulkInfo.sourceDir, tid);
   }
 
   @VisibleForTesting
@@ -104,49 +97,52 @@ public class PrepBulkImport extends MasterRepo {
     Iterator<KeyExtent> newTabletIter(Text startRow) throws Exception;
   }
 
+  private static boolean equals(Function<KeyExtent,Text> extractor, KeyExtent ke1, KeyExtent ke2) {
+    return Objects.equals(extractor.apply(ke1), extractor.apply(ke2));
+  }
+
   @VisibleForTesting
   static void checkForMerge(String tableId, Iterator<KeyExtent> lmi,
       TabletIterFactory tabletIterFactory) throws Exception {
-    KeyExtent currentRange = lmi.next();
+    KeyExtent currRange = lmi.next();
 
-    Text startRow = currentRange.getPrevEndRow();
+    Text startRow = currRange.getPrevEndRow();
 
     Iterator<KeyExtent> tabletIter = tabletIterFactory.newTabletIter(startRow);
 
-    KeyExtent currentTablet = tabletIter.next();
+    KeyExtent currTablet = tabletIter.next();
 
-    if (!tabletIter.hasNext() && equals(currentTablet.getPrevEndRow(), currentRange.getPrevEndRow())
-        && equals(currentTablet.getEndRow(), currentRange.getEndRow()))
-      currentRange = null;
+    if (!tabletIter.hasNext() && equals(KeyExtent::getPrevEndRow, currTablet, currRange)
+        && equals(KeyExtent::getEndRow, currTablet, currRange))
+      currRange = null;
 
     while (tabletIter.hasNext()) {
 
-      if (currentRange == null) {
+      if (currRange == null) {
         if (!lmi.hasNext()) {
           break;
         }
-        currentRange = lmi.next();
+        currRange = lmi.next();
       }
 
-      while (!equals(currentTablet.getPrevEndRow(), currentRange.getPrevEndRow())
-          && tabletIter.hasNext()) {
-        currentTablet = tabletIter.next();
+      while (!equals(KeyExtent::getPrevEndRow, currTablet, currRange) && tabletIter.hasNext()) {
+        currTablet = tabletIter.next();
       }
 
-      boolean matchedPrevRow = equals(currentTablet.getPrevEndRow(), currentRange.getPrevEndRow());
+      boolean matchedPrevRow = equals(KeyExtent::getPrevEndRow, currTablet, currRange);
 
-      while (!equals(currentTablet.getEndRow(), currentRange.getEndRow()) && tabletIter.hasNext()) {
-        currentTablet = tabletIter.next();
+      while (!equals(KeyExtent::getEndRow, currTablet, currRange) && tabletIter.hasNext()) {
+        currTablet = tabletIter.next();
       }
 
-      if (!matchedPrevRow || !equals(currentTablet.getEndRow(), currentRange.getEndRow())) {
+      if (!matchedPrevRow || !equals(KeyExtent::getEndRow, currTablet, currRange)) {
         break;
       }
 
-      currentRange = null;
+      currRange = null;
     }
 
-    if (currentRange != null || lmi.hasNext()) {
+    if (currRange != null || lmi.hasNext()) {
       // a merge happened between the time the mapping was generated and the table lock was
       // acquired
       throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT,
@@ -181,6 +177,8 @@ public class PrepBulkImport extends MasterRepo {
     // now that table lock is acquired check that all splits in load mapping exists in table
     checkForMerge(master);
 
+    bulkInfo.tableState = Tables.getTableState(master.getInstance(), bulkInfo.tableId);
+
     VolumeManager fs = master.getFileSystem();
     final UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
     Path sourceDir = new Path(bulkInfo.sourceDir);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
index d97d1f8..7aa0b31 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
@@ -20,20 +20,35 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.fail;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.Table;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.metadata.schema.MetadataScanner;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.minicluster.MemoryUnit;
@@ -46,8 +61,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
 /**
  * Tests new bulk import technique. If the old technique ever gets removed this will replace
  * {@link BulkFileIT}
@@ -69,82 +89,95 @@ public class BulkLoadIT extends AccumuloClusterHarness {
     return 4 * 60;
   }
 
-  @Test
-  public void testSingleTabletSingleFile() throws Exception {
+  private String tableName;
+  private AccumuloConfiguration aconf;
+  private FileSystem fs;
+  private String rootPath;
+
+  @Before
+  public void setupBulkTest() throws Exception {
     Connector c = getConnector();
-    String tableName = getUniqueNames(1)[0];
+    tableName = getUniqueNames(1)[0];
     c.tableOperations().create(tableName);
-    addSplits(tableName, "0333");
+    aconf = new ServerConfigurationFactory(c.getInstance()).getSystemConfiguration();
+    fs = getCluster().getFileSystem();
+    rootPath = cluster.getTemporaryPath().toString();
+  }
 
-    Configuration conf = new Configuration();
-    AccumuloConfiguration aconf = new ServerConfigurationFactory(c.getInstance())
-        .getSystemConfiguration();
-    FileSystem fs = getCluster().getFileSystem();
-    String rootPath = cluster.getTemporaryPath().toString();
+  private String getDir(String testName) throws Exception {
+    String dir = rootPath + testName + getUniqueNames(1)[0];
+    fs.delete(new Path(dir), true);
+    return dir;
+  }
 
-    String dir = rootPath + "/testSingleTabletSingleFileNoSplits-" + getUniqueNames(1)[0];
-    Path bulkDir = new Path(dir);
+  private void testSingleTabletSingleFile(boolean offline) throws Exception {
+    Connector c = getConnector();
+    addSplits(tableName, "0333");
+
+    if (offline)
+      c.tableOperations().offline(tableName);
 
-    fs.delete(bulkDir, true);
+    String dir = getDir("/testSingleTabletSingleFileNoSplits-");
 
-    FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f1." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer1.startDefaultLocalityGroup();
-    writeData(writer1, 0, 332);
-    writer1.close();
+    String h1 = writeData(dir + "/f1.", aconf, 0, 332);
 
     c.tableOperations().addFilesTo(tableName).from(dir).load();
+
+    if (offline)
+      c.tableOperations().online(tableName);
+
+    verifyData(tableName, 0, 332);
+    verifyMetadata(tableName,
+        ImmutableMap.of("0333", ImmutableSet.of(h1), "null", ImmutableSet.of()));
   }
 
   @Test
-  public void testSingleTabletSingleFileNoSplits() throws Exception {
-    Connector c = getConnector();
-    String tableName = getUniqueNames(1)[0];
-    c.tableOperations().create(tableName);
+  public void testSingleTabletSingleFile() throws Exception {
+    testSingleTabletSingleFile(false);
+  }
 
-    Configuration conf = new Configuration();
-    AccumuloConfiguration aconf = new ServerConfigurationFactory(c.getInstance())
-        .getSystemConfiguration();
-    FileSystem fs = getCluster().getFileSystem();
-    String rootPath = cluster.getTemporaryPath().toString();
+  @Test
+  public void testSingleTabletSingleFileOffline() throws Exception {
+    testSingleTabletSingleFile(true);
+  }
 
-    String dir = rootPath + "/testSingleTabletSingleFileNoSplits-" + getUniqueNames(1)[0];
-    Path bulkDir = new Path(dir);
+  private void testSingleTabletSingleFileNoSplits(boolean offline) throws Exception {
+    Connector c = getConnector();
 
-    fs.delete(bulkDir, true);
+    if (offline)
+      c.tableOperations().offline(tableName);
 
-    FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f1." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer1.startDefaultLocalityGroup();
-    writeData(writer1, 0, 333);
-    writer1.close();
+    String dir = getDir("/testSingleTabletSingleFileNoSplits-");
+
+    String h1 = writeData(dir + "/f1.", aconf, 0, 333);
 
     c.tableOperations().addFilesTo(tableName).from(dir).load();
+
+    if (offline)
+      c.tableOperations().online(tableName);
+
+    verifyData(tableName, 0, 333);
+    verifyMetadata(tableName, ImmutableMap.of("null", ImmutableSet.of(h1)));
+  }
+
+  @Test
+  public void testSingleTabletSingleFileNoSplits() throws Exception {
+    testSingleTabletSingleFileNoSplits(false);
+  }
+
+  @Test
+  public void testSingleTabletSingleFileNoSplitsOffline() throws Exception {
+    testSingleTabletSingleFileNoSplits(true);
   }
 
   @Test
   public void testBadPermissions() throws Exception {
     Connector c = getConnector();
-    String tableName = getUniqueNames(1)[0];
-    c.tableOperations().create(tableName);
     addSplits(tableName, "0333");
 
-    Configuration conf = new Configuration();
-    AccumuloConfiguration aconf = new ServerConfigurationFactory(c.getInstance())
-        .getSystemConfiguration();
-    FileSystem fs = getCluster().getFileSystem();
-    String rootPath = cluster.getTemporaryPath().toString();
-
-    String dir = rootPath + "/testBadPermissions-" + getUniqueNames(1)[0];
-    Path bulkDir = new Path(dir);
-
-    fs.delete(bulkDir, true);
+    String dir = getDir("/testBadPermissions-");
 
-    FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f1." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer1.startDefaultLocalityGroup();
-    writeData(writer1, 0, 333);
-    writer1.close();
+    writeData(dir + "/f1.", aconf, 0, 333);
 
     Path rFilePath = new Path(dir, "f1." + RFile.EXTENSION);
     FsPermission originalPerms = fs.getFileStatus(rFilePath).getPermission();
@@ -160,70 +193,68 @@ public class BulkLoadIT extends AccumuloClusterHarness {
       fs.setPermission(rFilePath, originalPerms);
     }
 
-    originalPerms = fs.getFileStatus(bulkDir).getPermission();
-    fs.setPermission(bulkDir, FsPermission.valueOf("dr--r--r--"));
+    originalPerms = fs.getFileStatus(new Path(dir)).getPermission();
+    fs.setPermission(new Path(dir), FsPermission.valueOf("dr--r--r--"));
     try {
       c.tableOperations().addFilesTo(tableName).from(dir).load();
     } catch (AccumuloException ae) {
       if (!(ae.getCause() instanceof FileNotFoundException))
         fail("Expected FileNotFoundException but threw " + ae.getCause());
     } finally {
-      fs.setPermission(bulkDir, originalPerms);
+      fs.setPermission(new Path(dir), originalPerms);
     }
   }
 
-  @Test
-  public void testBulkFile() throws Exception {
+  private void testBulkFile(boolean offline) throws Exception {
     Connector c = getConnector();
-    String tableName = getUniqueNames(1)[0];
-    c.tableOperations().create(tableName);
     addSplits(tableName, "0333 0666 0999 1333 1666");
 
-    Configuration conf = new Configuration();
-    AccumuloConfiguration aconf = new ServerConfigurationFactory(c.getInstance())
-        .getSystemConfiguration();
-    FileSystem fs = getCluster().getFileSystem();
-
-    String rootPath = cluster.getTemporaryPath().toString();
+    if (offline)
+      c.tableOperations().offline(tableName);
 
-    String dir = rootPath + "/testBulkFile-" + getUniqueNames(1)[0];
+    String dir = getDir("/testBulkFile-");
 
-    fs.delete(new Path(dir), true);
+    Map<String,Set<String>> hashes = new HashMap<>();
+    for (String endRow : Arrays.asList("0333 0666 0999 1333 1666 null".split(" "))) {
+      hashes.put(endRow, new HashSet<>());
+    }
 
-    // TODO verify that data gets loaded in appropriate Tablets
     // 1 Tablet 0333-null
-    FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f1." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer1.startDefaultLocalityGroup();
-    writeData(writer1, 0, 333);
-    writer1.close();
+    String h1 = writeData(dir + "/f1.", aconf, 0, 333);
+    hashes.get("0333").add(h1);
 
     // 2 Tablets 0666-0334, 0999-0667
-    FileSKVWriter writer2 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f2." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer2.startDefaultLocalityGroup();
-    writeData(writer2, 334, 999);
-    writer2.close();
+    String h2 = writeData(dir + "/f2.", aconf, 334, 999);
+    hashes.get("0666").add(h2);
+    hashes.get("0999").add(h2);
 
     // 2 Tablets 1333-1000, 1666-1334
-    FileSKVWriter writer3 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f3." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer3.startDefaultLocalityGroup();
-    // writeData(writer3, 1000, 1999);
-    writeData(writer3, 1000, 1499);
-    writer3.close();
+    String h3 = writeData(dir + "/f3.", aconf, 1000, 1499);
+    hashes.get("1333").add(h3);
+    hashes.get("1666").add(h3);
 
     // 2 Tablets 1666-1334, >1666
-    FileSKVWriter writer4 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f4." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer4.startDefaultLocalityGroup();
-    writeData(writer4, 1500, 1999);
-    writer4.close();
+    String h4 = writeData(dir + "/f4.", aconf, 1500, 1999);
+    hashes.get("1666").add(h4);
+    hashes.get("null").add(h4);
 
-    // TODO test c.tableOperations().offline(tableName, true);
     c.tableOperations().addFilesTo(tableName).from(dir).load();
 
+    if (offline)
+      c.tableOperations().online(tableName);
+
     verifyData(tableName, 0, 1999);
+    verifyMetadata(tableName, hashes);
+  }
+
+  @Test
+  public void testBulkFile() throws Exception {
+    testBulkFile(false);
+  }
+
+  @Test
+  public void testBulkFileOffline() throws Exception {
+    testBulkFile(true);
   }
 
   private void addSplits(String tableName, String splitString) throws Exception {
@@ -258,11 +289,55 @@ public class BulkLoadIT extends AccumuloClusterHarness {
     }
   }
 
-  private void writeData(FileSKVWriter w, int s, int e) throws Exception {
-    for (int i = s; i <= e; i++) {
-      w.append(new Key(new Text(String.format("%04d", i))),
-          new Value(Integer.toString(i).getBytes(UTF_8)));
+  private void verifyMetadata(String tableName, Map<String,Set<String>> expectedHashes)
+      throws Exception {
+
+    Set<String> endRowsSeen = new HashSet<>();
+
+    String id = getConnector().tableOperations().tableIdMap().get(tableName);
+    try (
+        MetadataScanner scanner = MetadataScanner.builder().from(getConnector()).scanMetadataTable()
+            .overRange(Table.ID.of(id)).fetchFiles().fetchLoaded().fetchPrev().build()) {
+      for (TabletMetadata tablet : scanner) {
+        Assert.assertTrue(tablet.getLoaded().isEmpty());
+
+        Set<String> fileHashes = tablet.getFiles().stream().map(f -> hash(f))
+            .collect(Collectors.toSet());
+
+        String endRow = tablet.getEndRow() == null ? "null" : tablet.getEndRow().toString();
+
+        Assert.assertEquals(expectedHashes.get(endRow), fileHashes);
+
+        endRowsSeen.add(endRow);
+      }
+
+      Assert.assertEquals(expectedHashes.keySet(), endRowsSeen);
     }
   }
 
+  private String hash(String filename) {
+    try {
+      byte data[] = Files.readAllBytes(Paths.get(filename.replaceFirst("^file:", "")));
+      byte hash[] = MessageDigest.getInstance("SHA1").digest(data);
+      return new BigInteger(1, hash).toString(16);
+    } catch (IOException | NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String writeData(String file, AccumuloConfiguration aconf, int s, int e)
+      throws Exception {
+    FileSystem fs = getCluster().getFileSystem();
+    String filename = file + RFile.EXTENSION;
+    try (FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder()
+        .forFile(filename, fs, fs.getConf()).withTableConfiguration(aconf).build()) {
+      writer.startDefaultLocalityGroup();
+      for (int i = s; i <= e; i++) {
+        writer.append(new Key(new Text(String.format("%04d", i))),
+            new Value(Integer.toString(i).getBytes(UTF_8)));
+      }
+    }
+
+    return hash(filename);
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
kturner@apache.org.

Mime
View raw message