accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [1/2] accumulo git commit: ACCUMULO-3327 clean up book-keeping periodically, checking against zookeeper
Date Tue, 12 May 2015 16:42:36 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 0b7d00db8 -> e2e5afb2a


ACCUMULO-3327 clean up book-keeping periodically, checking against zookeeper


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8efc9546
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8efc9546
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8efc9546

Branch: refs/heads/master
Commit: 8efc9546400f9b76afc3bfad93046a487c147e82
Parents: 8ccd7e7
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Tue May 12 12:42:14 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Tue May 12 12:42:14 2015 -0400

----------------------------------------------------------------------
 .../server/zookeeper/TransactionWatcher.java    | 21 +++++++
 .../apache/accumulo/tserver/TabletServer.java   |  4 ++
 .../tserver/tablet/BulkImportCacheCleaner.java  | 60 ++++++++++++++++++++
 .../apache/accumulo/tserver/tablet/Tablet.java  | 16 +++++-
 .../performance/metadata/FastBulkImportIT.java  |  2 +-
 5 files changed, 99 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
index 0e1cdfd..da94a3c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
@@ -16,8 +16,13 @@
  */
 package org.apache.accumulo.server.zookeeper;
 
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReader;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -59,6 +64,22 @@ public class TransactionWatcher extends org.apache.accumulo.fate.zookeeper.Trans
       writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running",
NodeMissingPolicy.SKIP);
     }
 
+    public static Set<Long> allTransactionsAlive(String type) throws KeeperException,
InterruptedException {
+      final Instance instance = HdfsZooInstance.getInstance();
+      final IZooReader reader = ZooReaderWriter.getInstance();
+      final Set<Long> result = new HashSet<>();
+      final String parent = ZooUtil.getRoot(instance) + "/" + type;
+      reader.sync(parent);
+      List<String> children = reader.getChildren(parent);
+      for (String child : children) {
+        if (child.endsWith("-running")) {
+          continue;
+        }
+        result.add(Long.parseLong(child));
+      }
+      return result;
+    }
+
     @Override
     public boolean transactionComplete(String type, long tid) throws Exception {
       String path = ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running";

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 6f2c9a2..7154732 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -223,6 +223,7 @@ import org.apache.accumulo.tserver.session.ScanSession;
 import org.apache.accumulo.tserver.session.Session;
 import org.apache.accumulo.tserver.session.SessionManager;
 import org.apache.accumulo.tserver.session.UpdateSession;
+import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
 import org.apache.accumulo.tserver.tablet.CommitSession;
 import org.apache.accumulo.tserver.tablet.CompactionInfo;
 import org.apache.accumulo.tserver.tablet.CompactionWatcher;
@@ -2423,6 +2424,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
     };
     SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000);
 
+    final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000;
+    SimpleTimer.getInstance(aconf).schedule(new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS,
CLEANUP_BULK_LOADED_CACHE_MILLIS);
+
     HostAndPort masterHost;
     while (!serverStopRequested) {
       // send all of the pending messages

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
new file mode 100644
index 0000000..fff2be2
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BulkImportCacheCleaner implements Runnable {
+
+  private static final Logger log = LoggerFactory.getLogger(BulkImportCacheCleaner.class);
+  private final TabletServer server;
+
+  public BulkImportCacheCleaner(TabletServer server) {
+    this.server = server;
+  }
+
+  @Override
+  public void run() {
+    // gather the list of transactions the tablets have cached
+    final Set<Long> tids = new HashSet<>();
+    for (Tablet tablet : server.getOnlineTablets()) {
+      tids.addAll(tablet.getBulkIngestedFiles().keySet());
+    }
+    try {
+      // get the current transactions from ZooKeeper
+      final Set<Long> allTransactionsAlive = ZooArbitrator.allTransactionsAlive(Constants.BULK_ARBITRATOR_TYPE);
+      // remove any that are still alive
+      tids.removeAll(allTransactionsAlive);
+      // cleanup any memory of these transactions
+      for (Tablet tablet : server.getOnlineTablets()) {
+        tablet.cleanupBulkLoadedFiles(tids);
+      }
+    } catch (KeeperException | InterruptedException e) {
+      // we'll just clean it up again later
+      log.debug("Error reading bulk import live transactions {}", e.toString());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 5de3236..7eb2069 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -41,7 +41,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -253,7 +253,7 @@ public class Tablet implements TabletCommitter {
 
   private final ConfigurationObserver configObserver;
 
-  private final Cache<Long,List<FileRef>> bulkImported = CacheBuilder.newBuilder().expireAfterAccess(4,
TimeUnit.HOURS).build();
+  private final Cache<Long,List<FileRef>> bulkImported = CacheBuilder.newBuilder().build();
 
   private final int logId;
 
@@ -586,7 +586,7 @@ public class Tablet implements TabletCommitter {
     // Force a load of any per-table properties
     configObserver.propertiesChanged();
     for (Long key : bulkImported.keys()) {
-      this.bulkImported.put(key, new ArrayList<FileRef>(bulkImported.get(key)));
+      this.bulkImported.put(key, new CopyOnWriteArrayList<FileRef>(bulkImported.get(key)));
     }
 
     if (!logEntries.isEmpty()) {
@@ -2792,4 +2792,14 @@ public class Tablet implements TabletCommitter {
     }
   }
 
+  public Map<Long, List<FileRef>> getBulkIngestedFiles() {
+    return new HashMap<Long, List<FileRef>>(bulkImported.asMap());
+  }
+
+  public void cleanupBulkLoadedFiles(Set<Long> tids) {
+    for (Long tid : tids) {
+      bulkImported.invalidate(tid);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
b/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
index 05c907c..5f670cc 100644
--- a/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
@@ -68,7 +68,7 @@ public class FastBulkImportIT extends ConfigurableMacIT {
     }
     c.tableOperations().addSplits(tableName, splits);
 
-    log.info("Creating bulk import files");
+    log.info("Creating lots of bulk import files");
     FileSystem fs = getCluster().getFileSystem();
     Path basePath = getCluster().getTemporaryPath();
     CachedConfiguration.setInstance(fs.getConf());


Mime
View raw message