accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [1/2] accumulo git commit: ACCUMULO-3327 cache bulk-loaded files in memory to avoid metadata table lookups
Date Fri, 01 May 2015 21:54:50 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master e7fc66caa -> 87af7f6d8


ACCUMULO-3327 cache bulk-loaded files in memory to avoid metadata table lookups


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/086c6ace
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/086c6ace
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/086c6ace

Branch: refs/heads/master
Commit: 086c6aceede4da90520b1caddb66b382ea8012bd
Parents: 0f0a519
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Fri May 1 17:54:20 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Fri May 1 17:54:20 2015 -0400

----------------------------------------------------------------------
 .../server/util/MasterMetadataUtil.java         |  10 +-
 .../accumulo/server/util/MetadataTableUtil.java |   8 +-
 .../tserver/tablet/DatafileManager.java         |  19 +-
 .../accumulo/tserver/tablet/SplitInfo.java      |  10 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  | 177 ++++++++++++-------
 .../test/functional/SplitRecoveryIT.java        |  12 +-
 .../performance/metadata/FastBulkImportIT.java  | 104 +++++++++++
 7 files changed, 248 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/086c6ace/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index 4a5650e..0a30314 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -60,6 +60,8 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Multimap;
+
 /**
  *
  */
@@ -68,7 +70,7 @@ public class MasterMetadataUtil {
   private static final Logger log = LoggerFactory.getLogger(MasterMetadataUtil.class);
 
   public static void addNewTablet(ClientContext context, KeyExtent extent, String path, TServerInstance
location, Map<FileRef,DataFileValue> datafileSizes,
-      Map<FileRef,Long> bulkLoadedFiles, String time, long lastFlushID, long lastCompactID,
ZooLock zooLock) {
+      Multimap<Long, FileRef> bulkLoadedFiles, String time, long lastFlushID, long
lastCompactID, ZooLock zooLock) {
     Mutation m = extent.getPrevRowUpdateMutation();
 
     TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(path.getBytes(UTF_8)));
@@ -87,9 +89,9 @@ public class MasterMetadataUtil {
       m.put(DataFileColumnFamily.NAME, entry.getKey().meta(), new Value(entry.getValue().encode()));
     }
 
-    for (Entry<FileRef,Long> entry : bulkLoadedFiles.entrySet()) {
-      byte[] tidBytes = Long.toString(entry.getValue()).getBytes();
-      m.put(TabletsSection.BulkFileColumnFamily.NAME, entry.getKey().meta(), new Value(tidBytes));
+    for (Entry<Long, FileRef> entry : bulkLoadedFiles.entries()) {
+      byte[] tidBytes = Long.toString(entry.getKey()).getBytes();
+      m.put(TabletsSection.BulkFileColumnFamily.NAME, entry.getValue().meta(), new Value(tidBytes));
     }
 
     MetadataTableUtil.update(context, zooLock, m, extent);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/086c6ace/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 4470c55..0152002 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -98,6 +98,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 
 /**
  * provides a reference to the metadata table for updates by tablet servers
@@ -924,9 +926,9 @@ public class MetadataTableUtil {
     }
   }
 
-  public static Map<FileRef,Long> getBulkFilesLoaded(ClientContext context, KeyExtent
extent) throws IOException {
+  public static Multimap<Long, FileRef> getBulkFilesLoaded(ClientContext context, KeyExtent
extent) throws IOException {
     Text metadataRow = extent.getMetadataEntry();
-    Map<FileRef,Long> ret = new HashMap<FileRef,Long>();
+    Multimap<Long, FileRef> ret = HashMultimap.create();
 
     VolumeManager fs = VolumeManagerImpl.get();
     Scanner scanner = new ScannerImpl(context, extent.isMeta() ? RootTable.ID : MetadataTable.ID,
Authorizations.EMPTY);
@@ -934,7 +936,7 @@ public class MetadataTableUtil {
     scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
     for (Entry<Key,Value> entry : scanner) {
       Long tid = Long.parseLong(entry.getValue().toString());
-      ret.put(new FileRef(fs, entry.getKey()), tid);
+      ret.put(tid, new FileRef(fs, entry.getKey()));
     }
     return ret;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/086c6ace/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index ab15ccc..82ae205 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -29,7 +28,6 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
-import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
@@ -207,7 +205,6 @@ class DatafileManager {
 
   public void importMapFiles(long tid, Map<FileRef,DataFileValue> pathsString, boolean
setTime) throws IOException {
 
-    final KeyExtent extent = tablet.getExtent();
     String bulkDir = null;
 
     Map<FileRef,DataFileValue> paths = new HashMap<FileRef,DataFileValue>();
@@ -235,23 +232,11 @@ class DatafileManager {
 
     }
 
-    if (tablet.getExtent().isRootTablet()) {
-      throw new IllegalArgumentException("Can not import files to root tablet");
+    if (tablet.getExtent().isMeta()) {
+      throw new IllegalArgumentException("Can not import files to a metadata tablet");
     }
 
     synchronized (bulkFileImportLock) {
-      Connector conn;
-      try {
-        conn = tablet.getTabletServer().getConnector();
-      } catch (Exception ex) {
-        throw new IOException(ex);
-      }
-      // Remove any bulk files we've previously loaded and compacted away
-      List<FileRef> files = MetadataTableUtil.getBulkFilesLoaded(conn, extent, tid);
-
-      for (FileRef file : files)
-        if (paths.keySet().remove(file))
-          log.debug("Ignoring request to re-import a file already imported: " + extent +
": " + file);
 
       if (paths.size() > 0) {
         long bulkTime = Long.MIN_VALUE;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/086c6ace/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
index f8f2183..d866d99 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
@@ -22,6 +22,8 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.master.state.TServerInstance;
 
+import com.google.common.collect.Multimap;
+
 /**
  * operations are disallowed while we split which is ok since splitting is fast
  *
@@ -39,14 +41,16 @@ final public class SplitInfo {
   private final long initFlushID;
   private final long initCompactID;
   private final TServerInstance lastLocation;
+  private final Multimap<Long, FileRef> bulkImported;
 
-  SplitInfo(String d, SortedMap<FileRef,DataFileValue> dfv, String time, long initFlushID,
long initCompactID, TServerInstance lastLocation) {
+  SplitInfo(String d, SortedMap<FileRef,DataFileValue> dfv, String time, long initFlushID,
long initCompactID, TServerInstance lastLocation, Multimap<Long, FileRef> bulkImported)
{
     this.dir = d;
     this.datafiles = dfv;
     this.time = time;
     this.initFlushID = initFlushID;
     this.initCompactID = initCompactID;
     this.lastLocation = lastLocation;
+    this.bulkImported = bulkImported;
   }
 
   public String getDir() {
@@ -73,4 +77,8 @@ final public class SplitInfo {
     return lastLocation;
   }
 
+  public Multimap<Long, FileRef> getBulkImported() {
+    return bulkImported;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/086c6ace/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 17864be..0b2d5e3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -17,6 +17,8 @@
 package org.apache.accumulo.tserver.tablet;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
+import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -37,7 +39,9 @@ import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -75,6 +79,8 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
@@ -149,6 +155,10 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 
 /**
  *
@@ -243,6 +253,9 @@ public class Tablet implements TabletCommitter {
 
   private final ConfigurationObserver configObserver;
 
+  private final Cache<Long, List<FileRef> > bulkImported = CacheBuilder.newBuilder().expireAfterAccess(4,
TimeUnit.HOURS).build();
+
+
   private final int logId;
 
   @Override
@@ -306,13 +319,13 @@ public class Tablet implements TabletCommitter {
 
   public Tablet(TabletServer tabletServer, KeyExtent extent, TabletResourceManager trm, SplitInfo
info) throws IOException {
     this(tabletServer, new Text(info.getDir()), extent, trm, info.getDatafiles(), info.getTime(),
info.getInitFlushID(), info.getInitCompactID(), info
-        .getLastLocation());
+        .getLastLocation(), info.getBulkImported());
     splitCreationTime = System.currentTimeMillis();
   }
 
   private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager
trm, SortedMap<FileRef,DataFileValue> datafiles,
-      String time, long initFlushID, long initCompactID, TServerInstance lastLocation) throws
IOException {
-    this(tabletServer, extent, location, trm, NO_LOG_ENTRIES, datafiles, time, lastLocation,
new HashSet<FileRef>(), initFlushID, initCompactID);
+      String time, long initFlushID, long initCompactID, TServerInstance lastLocation, Multimap<Long,
FileRef> bulkImported) throws IOException {
+    this(tabletServer, extent, location, trm, NO_LOG_ENTRIES, datafiles, time, lastLocation,
new HashSet<FileRef>(), initFlushID, initCompactID, bulkImported);
   }
 
   private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap<Key,Value>
tabletsKeyValues) {
@@ -338,7 +351,7 @@ public class Tablet implements TabletCommitter {
   private static SortedMap<FileRef,DataFileValue> lookupDatafiles(AccumuloServerContext
context, VolumeManager fs, KeyExtent extent,
       SortedMap<Key,Value> tabletsKeyValues) throws IOException {
 
-    TreeMap<FileRef,DataFileValue> datafiles = new TreeMap<FileRef,DataFileValue>();
+    TreeMap<FileRef,DataFileValue> result = new TreeMap<FileRef,DataFileValue>();
 
     if (extent.isRootTablet()) { // the meta0 tablet
       Path location = new Path(MetadataTableUtil.getRootTabletDir());
@@ -351,114 +364,128 @@ public class Tablet implements TabletCommitter {
         String filename = path.getName();
         FileRef ref = new FileRef(location.toString() + "/" + filename, path);
         DataFileValue dfv = new DataFileValue(0, 0);
-        datafiles.put(ref, dfv);
+        result.put(ref, dfv);
       }
     } else {
       final Text buffer = new Text();
-      final Text row = extent.getMetadataEntry();
 
       for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
         Key k = entry.getKey();
-        k.getRow(buffer);
-        // Double-check that we have the expected row
-        if (row.equals(buffer)) {
-          k.getColumnFamily(buffer);
-          // Ignore anything but file:
-          if (TabletsSection.DataFileColumnFamily.NAME.equals(buffer)) {
-            FileRef ref = new FileRef(fs, k);
-            datafiles.put(ref, new DataFileValue(entry.getValue().get()));
-          }
+        k.getColumnFamily(buffer);
+        // Ignore anything but file:
+        if (TabletsSection.DataFileColumnFamily.NAME.equals(buffer)) {
+          FileRef ref = new FileRef(fs, k);
+          result.put(ref, new DataFileValue(entry.getValue().get()));
         }
       }
     }
-    return datafiles;
+    return result;
   }
 
-  private static List<LogEntry> lookupLogEntries(AccumuloServerContext context, KeyExtent
ke, SortedMap<Key,Value> tabletsKeyValues) {
-    List<LogEntry> logEntries = new ArrayList<LogEntry>();
+  private static List<LogEntry> lookupLogEntries(SortedMap<Key,Value> tabletsKeyValues,
AccumuloServerContext context, KeyExtent ke) {
+    List<LogEntry> result = new ArrayList<LogEntry>();
 
-    if (ke.isMeta()) {
+    if (ke.isRootTablet()) {
       try {
-        logEntries = MetadataTableUtil.getLogEntries(context, ke);
+        result = MetadataTableUtil.getLogEntries(context, ke);
       } catch (Exception ex) {
         throw new RuntimeException("Unable to read tablet log entries", ex);
       }
     } else {
       log.debug("Looking at metadata " + tabletsKeyValues);
-      Text row = ke.getMetadataEntry();
       for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
         Key key = entry.getKey();
-        if (key.getRow().equals(row)) {
-          if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
-            logEntries.add(LogEntry.fromKeyValue(key, entry.getValue()));
-          }
+        if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
+          result.add(LogEntry.fromKeyValue(key, entry.getValue()));
         }
       }
     }
 
-    log.debug("got " + logEntries + " for logs for " + ke);
-    return logEntries;
+    log.debug("got " + result + " for logs for " + ke);
+    return result;
   }
 
-  private static Set<FileRef> lookupScanFiles(KeyExtent extent, SortedMap<Key,Value>
tabletsKeyValues, VolumeManager fs) throws IOException {
-    HashSet<FileRef> scanFiles = new HashSet<FileRef>();
+  private static Set<FileRef> lookupScanFiles(SortedMap<Key,Value> tabletsKeyValues,
VolumeManager fs) throws IOException {
+    HashSet<FileRef> result = new HashSet<FileRef>();
 
-    Text row = extent.getMetadataEntry();
     for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
       Key key = entry.getKey();
-      if (key.getRow().equals(row) && key.getColumnFamily().equals(ScanFileColumnFamily.NAME))
{
-        scanFiles.add(new FileRef(fs, key));
+      if (key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) {
+        result.add(new FileRef(fs, key));
       }
     }
 
-    return scanFiles;
+    return result;
   }
 
-  private static long lookupFlushID(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues)
{
-    Text row = extent.getMetadataEntry();
+  private static long lookupFlushID(SortedMap<Key,Value> tabletsKeyValues) {
     for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
       Key key = entry.getKey();
-      if (key.getRow().equals(row) && TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key.getColumnFamily(),
key.getColumnQualifier()))
+      if (FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
         return Long.parseLong(entry.getValue().toString());
     }
 
     return -1;
   }
 
-  private static long lookupCompactID(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues)
{
-    Text row = extent.getMetadataEntry();
+  private static long lookupCompactID(SortedMap<Key,Value> tabletsKeyValues) {
     for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
       Key key = entry.getKey();
-      if (key.getRow().equals(row) && TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(),
key.getColumnQualifier()))
+      if (COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
         return Long.parseLong(entry.getValue().toString());
     }
 
     return -1;
   }
 
-  private static TServerInstance lookupLastServer(KeyExtent extent, SortedMap<Key,Value>
tabletsKeyValues) {
+  private static TServerInstance lookupLastServer(SortedMap<Key,Value> tabletsKeyValues)
{
     for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
-      if (entry.getKey().getColumnFamily().compareTo(TabletsSection.LastLocationColumnFamily.NAME)
== 0) {
+      if (entry.getKey().getColumnFamily().compareTo(LastLocationColumnFamily.NAME) == 0)
{
         return new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier());
       }
     }
     return null;
   }
 
+  private static Multimap<Long, FileRef> lookupBulkImported(SortedMap<Key,Value>
tabletsKeyValues, VolumeManager fs) {
+    Multimap<Long, FileRef> result = HashMultimap.create();
+    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+      if (entry.getKey().getColumnFamily().compareTo(BulkFileColumnFamily.NAME) == 0) {
+        result.put(Long.decode(entry.getValue().toString()), new FileRef(fs, entry.getKey()));
+      }
+    }
+    return result;
+  }
+
   public Tablet(TabletServer tabletServer, KeyExtent extent, Text location, TabletResourceManager
trm, SortedMap<Key,Value> tabletsKeyValues)
       throws IOException {
-    this(tabletServer, extent, location, trm, lookupLogEntries(tabletServer, extent, tabletsKeyValues),
lookupDatafiles(tabletServer,
-        tabletServer.getFileSystem(), extent, tabletsKeyValues), lookupTime(tabletServer.getConfiguration(),
extent, tabletsKeyValues), lookupLastServer(
-        extent, tabletsKeyValues), lookupScanFiles(extent, tabletsKeyValues, tabletServer.getFileSystem()),
lookupFlushID(extent, tabletsKeyValues),
-        lookupCompactID(extent, tabletsKeyValues));
+    this(tabletServer, extent, location, trm,
+        lookupLogEntries(tabletsKeyValues, tabletServer, extent),
+        lookupDatafiles(tabletServer, tabletServer.getFileSystem(), extent, tabletsKeyValues),
+        lookupTime(tabletServer.getConfiguration(), extent, tabletsKeyValues),
+        lookupLastServer(tabletsKeyValues),
+        lookupScanFiles(tabletsKeyValues, tabletServer.getFileSystem()),
+        lookupFlushID(tabletsKeyValues),
+        lookupCompactID(tabletsKeyValues),
+        lookupBulkImported(tabletsKeyValues, tabletServer.getFileSystem()));
   }
 
   /**
    * yet another constructor - this one allows us to avoid costly lookups into the Metadata
table if we already know the files we need - as at split time
    */
-  private Tablet(final TabletServer tabletServer, final KeyExtent extent, final Text location,
final TabletResourceManager trm,
-      final List<LogEntry> rawLogEntries, final SortedMap<FileRef,DataFileValue>
rawDatafiles, String time, final TServerInstance lastLocation,
-      Set<FileRef> scanFiles, long initFlushID, long initCompactID) throws IOException
{
+  private Tablet(
+      final TabletServer tabletServer,
+      final KeyExtent extent,
+      final Text location,
+      final TabletResourceManager trm,
+      final List<LogEntry> rawLogEntries,
+      final SortedMap<FileRef,DataFileValue> rawDatafiles,
+      String time,
+      final TServerInstance lastLocation,
+      final Set<FileRef> scanFiles,
+      final long initFlushID,
+      final long initCompactID,
+      final Multimap<Long, FileRef> bulkImported) throws IOException {
 
     TableConfiguration tblConf = tabletServer.getTableConfiguration(extent);
     if (null == tblConf) {
@@ -574,14 +601,16 @@ public class Tablet implements TabletCommitter {
 
     // Force a load of any per-table properties
     configObserver.propertiesChanged();
+    for (Long key : bulkImported.keys()) {
+      this.bulkImported.put(key, new ArrayList<FileRef>(bulkImported.get(key)));
+    }
 
     if (!logEntries.isEmpty()) {
       log.info("Starting Write-Ahead Log recovery for " + this.extent);
-      // count[0] = entries used on tablet
-      // count[1] = track max time from walog entries wihtout timestamps
-      final long[] count = new long[2];
+      final AtomicLong entriesUsedOnTablet = new AtomicLong(0);
+      // track max time from walog entries without timestamps
+      final AtomicLong maxTime = new AtomicLong(Long.MIN_VALUE);
       final CommitSession commitSession = getTabletMemory().getCommitSession();
-      count[1] = Long.MIN_VALUE;
       try {
         Set<String> absPaths = new HashSet<String>();
         for (FileRef ref : datafiles.keySet())
@@ -596,20 +625,20 @@ public class Tablet implements TabletCommitter {
               if (!columnUpdate.hasTimestamp()) {
                 // if it is not a user set timestamp, it must have been set
                 // by the system
-                count[1] = Math.max(count[1], columnUpdate.getTimestamp());
+                maxTime.set(Math.max(maxTime.get(), columnUpdate.getTimestamp()));
               }
             }
             getTabletMemory().mutate(commitSession, Collections.singletonList(m));
-            count[0]++;
+            entriesUsedOnTablet.incrementAndGet();
           }
         });
 
-        if (count[1] != Long.MIN_VALUE) {
-          tabletTime.useMaxTimeFromWALog(count[1]);
+        if (maxTime.get() != Long.MIN_VALUE) {
+          tabletTime.useMaxTimeFromWALog(maxTime.get());
         }
         commitSession.updateMaxCommittedTime(tabletTime.getTime());
 
-        if (count[0] == 0) {
+        if (entriesUsedOnTablet.get() == 0) {
           log.debug("No replayed mutations applied, removing unused entries for " + extent);
           MetadataTableUtil.removeUnusedWALEntries(getTabletServer(), extent, logEntries,
tabletServer.getLock());
 
@@ -646,7 +675,7 @@ public class Tablet implements TabletCommitter {
         currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.filename,
logEntry.getColumnQualifier().toString()));
       }
 
-      log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] +
" mutations applied, " + getTabletMemory().getNumEntries()
+      log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + entriesUsedOnTablet.get()
+ " mutations applied, " + getTabletMemory().getNumEntries()
           + " entries created)");
     }
 
@@ -2266,7 +2295,7 @@ public class Tablet implements TabletCommitter {
       // it is possible that some of the bulk loading flags will be deleted after being read
below because the bulk load
       // finishes.... therefore split could propagate load flags for a finished bulk load...
there is a special iterator
       // on the metadata table to clean up this type of garbage
-      Map<FileRef,Long> bulkLoadedFiles = MetadataTableUtil.getBulkFilesLoaded(getTabletServer(),
extent);
+      Multimap<Long, FileRef> bulkLoadedFiles = MetadataTableUtil.getBulkFilesLoaded(getTabletServer(),
extent);
 
       MetadataTableUtil.splitTablet(high, extent.getPrevEndRow(), splitRatio, getTabletServer(),
getTabletServer().getLock());
       MasterMetadataUtil.addNewTablet(getTabletServer(), low, lowDirectory, getTabletServer().getTabletSession(),
lowDatafileSizes, bulkLoadedFiles, time,
@@ -2275,8 +2304,8 @@ public class Tablet implements TabletCommitter {
 
       log.log(TLevel.TABLET_HIST, extent + " split " + low + " " + high);
 
-      newTablets.put(high, new SplitInfo(tabletDirectory, highDatafileSizes, time, lastFlushID,
lastCompactID, lastLocation));
-      newTablets.put(low, new SplitInfo(lowDirectory, lowDatafileSizes, time, lastFlushID,
lastCompactID, lastLocation));
+      newTablets.put(high, new SplitInfo(tabletDirectory, highDatafileSizes, time, lastFlushID,
lastCompactID, lastLocation, bulkLoadedFiles));
+      newTablets.put(low, new SplitInfo(lowDirectory, lowDatafileSizes, time, lastFlushID,
lastCompactID, lastLocation, bulkLoadedFiles));
 
       long t2 = System.currentTimeMillis();
 
@@ -2349,8 +2378,21 @@ public class Tablet implements TabletCommitter {
         throw new IOException("Timeout waiting " + (lockWait / 1000.) + " seconds to get
tablet lock");
       }
 
-      if (writesInProgress < 0)
+      List<FileRef> alreadyImported = bulkImported.getIfPresent(tid);
+      if (alreadyImported != null) {
+        for (FileRef entry : alreadyImported) {
+          if (fileMap.remove(entry) != null) {
+            log.info("Ignoring import of bulk file already imported: " + entry);
+          }
+        }
+      }
+      if (fileMap.isEmpty()) {
+        return;
+      }
+
+      if (writesInProgress < 0) {
         throw new IllegalStateException("writesInProgress < 0 " + writesInProgress);
+      }
 
       writesInProgress++;
     }
@@ -2372,6 +2414,17 @@ public class Tablet implements TabletCommitter {
         writesInProgress--;
         if (writesInProgress == 0)
           this.notifyAll();
+
+        try {
+          bulkImported.get(tid, new Callable<List<FileRef>>() {
+            @Override
+            public List<FileRef> call() throws Exception {
+              return new ArrayList<FileRef>();
+            }
+          }).addAll(fileMap.keySet());
+        } catch (Exception ex) {
+          log.info(ex.toString(), ex);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/086c6ace/test/src/test/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
index 0b0e330..2933407 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -68,6 +67,8 @@ import org.apache.accumulo.tserver.TabletServer;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
+import com.google.common.collect.Multimap;
+
 public class SplitRecoveryIT extends ConfigurableMacIT {
 
   @Override
@@ -177,11 +178,12 @@ public class SplitRecoveryIT extends ConfigurableMacIT {
     writer.update(m);
 
     if (steps >= 1) {
-      Map<FileRef,Long> bulkFiles = MetadataTableUtil.getBulkFilesLoaded(context, extent);
+      Multimap<Long,FileRef> bulkFiles = MetadataTableUtil.getBulkFilesLoaded(context,
extent);
       MasterMetadataUtil.addNewTablet(context, low, "/lowDir", instance, lowDatafileSizes,
bulkFiles, TabletTime.LOGICAL_TIME_ID + "0", -1l, -1l, zl);
     }
-    if (steps >= 2)
+    if (steps >= 2) {
       MetadataTableUtil.finishSplit(high, highDatafileSizes, highDatafilesToRemove, context,
zl);
+    }
 
     TabletServer.verifyTabletInformation(context, high, instance, null, "127.0.0.1:0", zl);
 
@@ -189,8 +191,8 @@ public class SplitRecoveryIT extends ConfigurableMacIT {
       ensureTabletHasNoUnexpectedMetadataEntries(context, low, lowDatafileSizes);
       ensureTabletHasNoUnexpectedMetadataEntries(context, high, highDatafileSizes);
 
-      Map<FileRef,Long> lowBulkFiles = MetadataTableUtil.getBulkFilesLoaded(context,
low);
-      Map<FileRef,Long> highBulkFiles = MetadataTableUtil.getBulkFilesLoaded(context,
high);
+      Multimap<Long,FileRef> lowBulkFiles = MetadataTableUtil.getBulkFilesLoaded(context,
low);
+      Multimap<Long,FileRef> highBulkFiles = MetadataTableUtil.getBulkFilesLoaded(context,
high);
 
       if (!lowBulkFiles.equals(highBulkFiles)) {
         throw new Exception(" " + lowBulkFiles + " != " + highBulkFiles + " " + low + " "
+ high);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/086c6ace/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
b/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
new file mode 100644
index 0000000..d1ecfe9
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.performance.metadata;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+// ACCUMULO-3327
+public class FastBulkImportIT extends ConfigurableMacIT {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(3);
+    cfg.setProperty(Property.TSERV_BULK_ASSIGNMENT_THREADS, "5");
+    cfg.setProperty(Property.TSERV_BULK_PROCESS_THREADS, "5");
+    cfg.setProperty(Property.TABLE_MAJC_RATIO, "9999");
+    cfg.setProperty(Property.TABLE_FILE_MAX, "9999");
+  }
+
+
+  @Test
+  public void test() throws Exception {
+    log.info("Creating table");
+    final String tableName = getUniqueNames(1)[0];
+    final Connector c = getConnector();
+    c.tableOperations().create(tableName);
+    log.info("Adding splits");
+    SortedSet<Text> splits = new TreeSet<>();
+    for (int i = 1; i < 0xfff; i += 7) {
+      splits.add(new Text(Integer.toHexString(i)));
+    }
+    c.tableOperations().addSplits(tableName, splits);
+
+    log.info("Creating bulk import files");
+    FileSystem fs = getCluster().getFileSystem();
+    Path basePath = getCluster().getTemporaryPath();
+    CachedConfiguration.setInstance(fs.getConf());
+
+    Path base = new Path(basePath, "testBulkFail_" + tableName);
+    fs.delete(base, true);
+    fs.mkdirs(base);
+    Path bulkFailures = new Path(base, "failures");
+    Path files = new Path(base, "files");
+    fs.mkdirs(bulkFailures);
+    fs.mkdirs(files);
+    for (int i = 0; i < 100; i++) {
+      FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_"
+ i + "." + RFile.EXTENSION, fs, fs.getConf(), AccumuloConfiguration.getDefaultConfiguration());
+      writer.startDefaultLocalityGroup();
+      for (int j = 0x100; j < 0xfff; j += 3) {
+        writer.append(new Key(Integer.toHexString(j)), new Value(new byte[0]));
+      }
+      writer.close();
+    }
+    log.info("Waiting for balance");
+    c.instanceOperations().waitForBalance();
+
+    log.info("Bulk importing files");
+    long now = System.currentTimeMillis();
+    c.tableOperations().importDirectory(tableName, files.toString(), bulkFailures.toString(),
true);
+    double diffSeconds = (System.currentTimeMillis() - now) / 1000.;
+    log.info(String.format("Import took %.2f seconds", diffSeconds));
+    assertTrue(diffSeconds < 30);
+  }
+
+}


Mime
View raw message