accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject accumulo git commit: ACCUMULO-3927 use the bulk-import flags in memory instead of reading the metadata table, added a split stress integration test
Date Tue, 30 Jun 2015 20:15:46 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master d513f826b -> 971d873e1


ACCUMULO-3927 use the bulk-import flags in memory instead of reading the metadata table, added
a split stress integration test


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/971d873e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/971d873e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/971d873e

Branch: refs/heads/master
Commit: 971d873e1448daf553f6999df29c06811331a188
Parents: d513f82
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Tue Jun 30 16:03:28 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Tue Jun 30 16:03:28 2015 -0400

----------------------------------------------------------------------
 .../server/util/MasterMetadataUtil.java         | 13 +--
 .../accumulo/server/util/MetadataTableUtil.java | 15 +--
 .../accumulo/tserver/tablet/SplitInfo.java      | 10 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  | 30 +++---
 .../org/apache/accumulo/test/ManySplitIT.java   | 97 ++++++++++++++++++++
 .../test/functional/SplitRecoveryIT.java        | 10 +-
 6 files changed, 137 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/971d873e/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index da2948d..bac70b2 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -60,8 +61,6 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Multimap;
-
 /**
  *
  */
@@ -70,7 +69,7 @@ public class MasterMetadataUtil {
   private static final Logger log = LoggerFactory.getLogger(MasterMetadataUtil.class);
 
   public static void addNewTablet(ClientContext context, KeyExtent extent, String path, TServerInstance
location, Map<FileRef,DataFileValue> datafileSizes,
-      Multimap<Long,FileRef> bulkLoadedFiles, String time, long lastFlushID, long lastCompactID,
ZooLock zooLock) {
+      Map<Long,? extends Collection<FileRef>> bulkLoadedFiles, String time, long
lastFlushID, long lastCompactID, ZooLock zooLock) {
     Mutation m = extent.getPrevRowUpdateMutation();
 
     TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(path.getBytes(UTF_8)));
@@ -89,9 +88,11 @@ public class MasterMetadataUtil {
       m.put(DataFileColumnFamily.NAME, entry.getKey().meta(), new Value(entry.getValue().encode()));
     }
 
-    for (Entry<Long,FileRef> entry : bulkLoadedFiles.entries()) {
-      byte[] tidBytes = Long.toString(entry.getKey()).getBytes();
-      m.put(TabletsSection.BulkFileColumnFamily.NAME, entry.getValue().meta(), new Value(tidBytes));
+    for (Entry<Long,? extends Collection<FileRef>> entry : bulkLoadedFiles.entrySet())
{
+      Value tidBytes = new Value(Long.toString(entry.getKey()).getBytes());
+      for (FileRef ref : entry.getValue()) {
+        m.put(TabletsSection.BulkFileColumnFamily.NAME, ref.meta(), new Value(tidBytes));
+      }
     }
 
     MetadataTableUtil.update(context, zooLock, m, extent);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/971d873e/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 45d2fef..4a85a88 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -23,6 +23,7 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -95,8 +96,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
 
 /**
  * provides a reference to the metadata table for updates by tablet servers
@@ -923,9 +922,9 @@ public class MetadataTableUtil {
     }
   }
 
-  public static Multimap<Long,FileRef> getBulkFilesLoaded(ClientContext context, KeyExtent
extent) throws IOException {
+  public static Map<Long,? extends Collection<FileRef>> getBulkFilesLoaded(ClientContext
context, KeyExtent extent) throws IOException {
     Text metadataRow = extent.getMetadataEntry();
-    Multimap<Long,FileRef> ret = HashMultimap.create();
+    Map<Long,List<FileRef>> result = new HashMap<>();
 
     VolumeManager fs = VolumeManagerImpl.get();
     Scanner scanner = new ScannerImpl(context, extent.isMeta() ? RootTable.ID : MetadataTable.ID,
Authorizations.EMPTY);
@@ -933,9 +932,13 @@ public class MetadataTableUtil {
     scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
     for (Entry<Key,Value> entry : scanner) {
       Long tid = Long.parseLong(entry.getValue().toString());
-      ret.put(tid, new FileRef(fs, entry.getKey()));
+      List<FileRef> lst = result.get(tid);
+      if (lst == null) {
+        result.put(tid, lst = new ArrayList<>());
+      }
+      lst.add(new FileRef(fs, entry.getKey()));
     }
-    return ret;
+    return result;
   }
 
   public static void addBulkLoadInProgressFlag(AccumuloServerContext context, String path)
{

http://git-wip-us.apache.org/repos/asf/accumulo/blob/971d873e/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
index f2111c7..64b6a11 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SplitInfo.java
@@ -16,14 +16,14 @@
  */
 package org.apache.accumulo.tserver.tablet;
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.SortedMap;
 
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.master.state.TServerInstance;
 
-import com.google.common.collect.Multimap;
-
 /**
  * operations are disallowed while we split which is ok since splitting is fast
  *
@@ -41,10 +41,10 @@ final public class SplitInfo {
   private final long initFlushID;
   private final long initCompactID;
   private final TServerInstance lastLocation;
-  private final Multimap<Long,FileRef> bulkImported;
+  private final Map<Long, ? extends Collection<FileRef>> bulkImported;
 
   SplitInfo(String d, SortedMap<FileRef,DataFileValue> dfv, String time, long initFlushID,
long initCompactID, TServerInstance lastLocation,
-      Multimap<Long,FileRef> bulkImported) {
+      Map<Long, ? extends Collection<FileRef>> bulkImported) {
     this.dir = d;
     this.datafiles = dfv;
     this.time = time;
@@ -78,7 +78,7 @@ final public class SplitInfo {
     return lastLocation;
   }
 
-  public Multimap<Long,FileRef> getBulkImported() {
+  public Map<Long, ? extends Collection<FileRef>> getBulkImported() {
     return bulkImported;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/971d873e/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 8b4c7d1..c0fb918 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -158,8 +158,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
 
 /**
  *
@@ -324,7 +322,7 @@ public class Tablet implements TabletCommitter {
   }
 
   private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager
trm, SortedMap<FileRef,DataFileValue> datafiles,
-      String time, long initFlushID, long initCompactID, TServerInstance lastLocation, Multimap<Long,FileRef>
bulkImported) throws IOException {
+      String time, long initFlushID, long initCompactID, TServerInstance lastLocation, Map<Long,
? extends Collection<FileRef>> bulkImported) throws IOException {
     this(tabletServer, extent, location, trm, NO_LOG_ENTRIES, datafiles, time, lastLocation,
new HashSet<FileRef>(), initFlushID, initCompactID, bulkImported);
   }
 
@@ -447,11 +445,16 @@ public class Tablet implements TabletCommitter {
     return null;
   }
 
-  private static Multimap<Long,FileRef> lookupBulkImported(SortedMap<Key,Value>
tabletsKeyValues, VolumeManager fs) {
-    Multimap<Long,FileRef> result = HashMultimap.create();
+  private static Map<Long, List<FileRef>> lookupBulkImported(SortedMap<Key,Value>
tabletsKeyValues, VolumeManager fs) {
+    Map<Long,List<FileRef>> result = new HashMap<>();
     for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
       if (entry.getKey().getColumnFamily().compareTo(BulkFileColumnFamily.NAME) == 0) {
-        result.put(Long.decode(entry.getValue().toString()), new FileRef(fs, entry.getKey()));
+        Long id = Long.decode(entry.getValue().toString());
+        List<FileRef> lst = result.get(id);
+        if (lst == null) {
+          lst = new ArrayList<FileRef>();
+        }
+        lst.add(new FileRef(fs, entry.getKey()));
       }
     }
     return result;
@@ -470,7 +473,7 @@ public class Tablet implements TabletCommitter {
    */
   private Tablet(final TabletServer tabletServer, final KeyExtent extent, final Text location,
final TabletResourceManager trm,
       final List<LogEntry> rawLogEntries, final SortedMap<FileRef,DataFileValue>
rawDatafiles, String time, final TServerInstance lastLocation,
-      final Set<FileRef> scanFiles, final long initFlushID, final long initCompactID,
final Multimap<Long,FileRef> bulkImported) throws IOException {
+      final Set<FileRef> scanFiles, final long initFlushID, final long initCompactID,
final Map<Long, ? extends Collection<FileRef>> bulkImported) throws IOException
{
 
     TableConfiguration tblConf = tabletServer.getTableConfiguration(extent);
     if (null == tblConf) {
@@ -587,7 +590,7 @@ public class Tablet implements TabletCommitter {
 
     // Force a load of any per-table properties
     configObserver.propertiesChanged();
-    for (Long key : bulkImported.keys()) {
+    for (Long key : bulkImported.keySet()) {
       this.bulkImported.put(key, new CopyOnWriteArrayList<FileRef>(bulkImported.get(key)));
     }
 
@@ -2293,20 +2296,15 @@ public class Tablet implements TabletCommitter {
 
       String time = tabletTime.getMetadataValue();
 
-      // it is possible that some of the bulk loading flags will be deleted after being read
below because the bulk load
-      // finishes.... therefore split could propagate load flags for a finished bulk load...
there is a special iterator
-      // on the metadata table to clean up this type of garbage
-      Multimap<Long,FileRef> bulkLoadedFiles = MetadataTableUtil.getBulkFilesLoaded(getTabletServer(),
extent);
-
       MetadataTableUtil.splitTablet(high, extent.getPrevEndRow(), splitRatio, getTabletServer(),
getTabletServer().getLock());
-      MasterMetadataUtil.addNewTablet(getTabletServer(), low, lowDirectory, getTabletServer().getTabletSession(),
lowDatafileSizes, bulkLoadedFiles, time,
+      MasterMetadataUtil.addNewTablet(getTabletServer(), low, lowDirectory, getTabletServer().getTabletSession(),
lowDatafileSizes, getBulkIngestedFiles(), time,
           lastFlushID, lastCompactID, getTabletServer().getLock());
       MetadataTableUtil.finishSplit(high, highDatafileSizes, highDatafilesToRemove, getTabletServer(),
getTabletServer().getLock());
 
       log.log(TLevel.TABLET_HIST, extent + " split " + low + " " + high);
 
-      newTablets.put(high, new SplitInfo(tabletDirectory, highDatafileSizes, time, lastFlushID,
lastCompactID, lastLocation, bulkLoadedFiles));
-      newTablets.put(low, new SplitInfo(lowDirectory, lowDatafileSizes, time, lastFlushID,
lastCompactID, lastLocation, bulkLoadedFiles));
+      newTablets.put(high, new SplitInfo(tabletDirectory, highDatafileSizes, time, lastFlushID,
lastCompactID, lastLocation, getBulkIngestedFiles()));
+      newTablets.put(low, new SplitInfo(lowDirectory, lowDatafileSizes, time, lastFlushID,
lastCompactID, lastLocation, getBulkIngestedFiles()));
 
       long t2 = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/971d873e/test/src/main/java/org/apache/accumulo/test/ManySplitIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ManySplitIT.java b/test/src/main/java/org/apache/accumulo/test/ManySplitIT.java
new file mode 100644
index 0000000..dc61f53
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ManySplitIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertTrue;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+
+public class ManySplitIT extends ConfigurableMacBase {
+
+  final int SPLITS = 10_000;
+
+  @Test(timeout = 2 * 60 * 1000)
+  public void test() throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+
+    log.info("Creating table");
+    final TableOperations tableOperations = getConnector().tableOperations();
+
+    log.info("splitting metadata table");
+    tableOperations.create(tableName);
+    SortedSet<Text> splits = new TreeSet<Text>();
+    for (byte b : "123456789abcde".getBytes(UTF_8)) {
+      splits.add(new Text(new byte[]{'1', ';', b}));
+    }
+    tableOperations.addSplits(MetadataTable.NAME, splits);
+    splits.clear();
+    for (int i = 0; i < SPLITS; i++) {
+      splits.add(new Text(Integer.toHexString(i)));
+    }
+    log.info("Adding splits");
+    // print out the number of splits so we have some idea of what's going on
+    final AtomicBoolean stop = new AtomicBoolean(false);
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        while (!stop.get()) {
+          UtilWaitThread.sleep(1000);
+          try {
+            log.info("splits: " + tableOperations.listSplits(tableName).size() );
+          } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException
e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          }
+        }
+      }
+    };
+    t.start();
+    long now = System.currentTimeMillis();
+    tableOperations.addSplits(tableName, splits);
+    long diff = System.currentTimeMillis() - now;
+    double splitsPerSec = SPLITS / (diff / 1000.);
+    log.info("Done: {} splits per second", splitsPerSec);
+    assertTrue("splits created too slowly", splitsPerSec > 100);
+    stop.set(true);
+    t.join();
+  }
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hdfs) {
+    cfg.setNumTservers(1);
+    cfg.setMemory(ServerType.TABLET_SERVER, cfg.getMemory(ServerType.TABLET_SERVER) * 2,
MemoryUnit.BYTE);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/971d873e/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
index 4d13e2a..2026c37 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
@@ -20,10 +20,12 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -67,8 +69,6 @@ import org.apache.accumulo.tserver.TabletServer;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
-import com.google.common.collect.Multimap;
-
 public class SplitRecoveryIT extends ConfigurableMacBase {
 
   @Override
@@ -178,7 +178,7 @@ public class SplitRecoveryIT extends ConfigurableMacBase {
     writer.update(m);
 
     if (steps >= 1) {
-      Multimap<Long,FileRef> bulkFiles = MetadataTableUtil.getBulkFilesLoaded(context,
extent);
+      Map<Long,? extends Collection<FileRef>> bulkFiles = MetadataTableUtil.getBulkFilesLoaded(context,
extent);
       MasterMetadataUtil.addNewTablet(context, low, "/lowDir", instance, lowDatafileSizes,
bulkFiles, TabletTime.LOGICAL_TIME_ID + "0", -1l, -1l, zl);
     }
     if (steps >= 2) {
@@ -191,8 +191,8 @@ public class SplitRecoveryIT extends ConfigurableMacBase {
       ensureTabletHasNoUnexpectedMetadataEntries(context, low, lowDatafileSizes);
       ensureTabletHasNoUnexpectedMetadataEntries(context, high, highDatafileSizes);
 
-      Multimap<Long,FileRef> lowBulkFiles = MetadataTableUtil.getBulkFilesLoaded(context,
low);
-      Multimap<Long,FileRef> highBulkFiles = MetadataTableUtil.getBulkFilesLoaded(context,
high);
+      Map<Long,? extends Collection<FileRef>> lowBulkFiles = MetadataTableUtil.getBulkFilesLoaded(context,
low);
+      Map<Long,? extends Collection<FileRef>> highBulkFiles = MetadataTableUtil.getBulkFilesLoaded(context,
high);
 
       if (!lowBulkFiles.equals(highBulkFiles)) {
         throw new Exception(" " + lowBulkFiles + " != " + highBulkFiles + " " + low + " "
+ high);


Mime
View raw message