accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [16/16] git commit: Merge branch '1.6'
Date Mon, 20 Oct 2014 01:05:57 GMT
Merge branch '1.6'

Conflicts:
	fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
	server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
	server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
	server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
	server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
	server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
	server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8bea7aba
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8bea7aba
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8bea7aba

Branch: refs/heads/master
Commit: 8bea7aba937d35f03185321869062ade57f83165
Parents: f118b22 0e4159c
Author: Josh Elser <elserj@apache.org>
Authored: Sun Oct 19 20:55:34 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Sun Oct 19 20:56:42 2014 -0400

----------------------------------------------------------------------
 .../accumulo/fate/zookeeper/IZooReader.java     |  20 +-
 .../fate/zookeeper/IZooReaderWriter.java        |  34 +-
 .../apache/accumulo/fate/zookeeper/Retry.java   |  97 +++++
 .../accumulo/fate/zookeeper/RetryFactory.java   |  38 ++
 .../zookeeper/RetryingInvocationHandler.java    |  63 ---
 .../accumulo/fate/zookeeper/ZooCache.java       |  14 +-
 .../accumulo/fate/zookeeper/ZooQueueLock.java   |  20 +-
 .../accumulo/fate/zookeeper/ZooReader.java      | 137 ++++++-
 .../fate/zookeeper/ZooReaderWriter.java         | 158 +++++---
 .../accumulo/fate/zookeeper/ZooSession.java     |  42 +-
 .../apache/accumulo/fate/zookeeper/ZooUtil.java | 394 ++++++++++++++-----
 .../accumulo/fate/zookeeper/RetryTest.java      | 127 ++++++
 .../RetryingInvocationHandlerTest.java          |  87 ----
 .../fate/zookeeper/ZooReaderWriterTest.java     | 193 +++++++++
 .../org/apache/accumulo/server/Accumulo.java    |   2 +-
 .../apache/accumulo/server/init/Initialize.java |   4 +-
 .../accumulo/server/master/LiveTServerSet.java  |   2 +-
 .../security/handler/ZKAuthenticator.java       |  42 +-
 .../server/security/handler/ZKAuthorizor.java   |   8 +-
 .../server/security/handler/ZKPermHandler.java  |  50 +--
 .../accumulo/server/tables/TableManager.java    |  16 +-
 .../server/tablets/UniqueNameAllocator.java     |   2 +-
 .../accumulo/server/zookeeper/ZooQueueLock.java |   8 +-
 .../server/zookeeper/ZooReaderWriter.java       |  44 ---
 .../zookeeper/ZooReaderWriterFactory.java       |  21 -
 .../java/org/apache/accumulo/master/Master.java |   4 +-
 .../master/tableOps/CancelCompactions.java      |  34 +-
 .../accumulo/master/tableOps/CompactRange.java  |   6 +-
 .../master/tableOps/RenameNamespace.java        |   2 +-
 .../accumulo/master/tableOps/RenameTable.java   |   2 +-
 .../apache/accumulo/master/tableOps/Utils.java  |  12 +-
 .../apache/accumulo/master/util/FateAdmin.java  |   2 +-
 .../tserver/tablet/DatafileManager.java         |   4 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |   6 +-
 34 files changed, 1154 insertions(+), 541 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
index 61c9991,5f6ff71..d46d623
--- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
@@@ -38,12 -37,12 +38,12 @@@ import org.apache.zookeeper.KeeperExcep
  
  // Utility class for adding all authentication info into ZK
  public final class ZKAuthenticator implements Authenticator {
 -  static final Logger log = Logger.getLogger(ZKAuthenticator.class);
 +  private static final Logger log = Logger.getLogger(ZKAuthenticator.class);
    private static Authenticator zkAuthenticatorInstance = null;
-   
+ 
    private String ZKUserPath;
    private final ZooCache zooCache;
-   
+ 
    public static synchronized Authenticator getInstance() {
      if (zkAuthenticatorInstance == null)
        zkAuthenticatorInstance = new ZKAuthenticator();
@@@ -70,10 -69,10 +70,10 @@@
            zoo.recursiveDelete(ZKUserPath, NodeMissingPolicy.SKIP);
            log.info("Removed " + ZKUserPath + "/" + " from zookeeper");
          }
-         
+ 
          // prep parent node of users with root username
 -        zoo.putPersistentData(ZKUserPath, principal.getBytes(Constants.UTF8), NodeExistsPolicy.FAIL);
 +        zoo.putPersistentData(ZKUserPath, principal.getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.FAIL);
-         
+ 
          constructUser(principal, ZKSecurityTool.createPass(token));
        }
      } catch (KeeperException e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
index 25dbcce,71b7155..7d27909
--- a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
@@@ -64,9 -63,9 +64,9 @@@ public class TableManager 
      log.debug("Creating ZooKeeper entries for new namespace " + namespace + " (ID: " + namespaceId
+ ")");
      String zPath = Constants.ZROOT + "/" + instanceId + Constants.ZNAMESPACES + "/" + namespaceId;
  
-     IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+     IZooReaderWriter zoo = ZooReaderWriter.getInstance();
      zoo.putPersistentData(zPath, new byte[0], existsPolicy);
 -    zoo.putPersistentData(zPath + Constants.ZNAMESPACE_NAME, namespace.getBytes(Constants.UTF8),
existsPolicy);
 +    zoo.putPersistentData(zPath + Constants.ZNAMESPACE_NAME, namespace.getBytes(StandardCharsets.UTF_8),
existsPolicy);
      zoo.putPersistentData(zPath + Constants.ZNAMESPACE_CONF, new byte[0], existsPolicy);
    }
  
@@@ -77,11 -76,11 +77,11 @@@
      Pair<String,String> qualifiedTableName = Tables.qualify(tableName);
      tableName = qualifiedTableName.getSecond();
      String zTablePath = Constants.ZROOT + "/" + instanceId + Constants.ZTABLES + "/" + tableId;
-     IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+     IZooReaderWriter zoo = ZooReaderWriter.getInstance();
      zoo.putPersistentData(zTablePath, new byte[0], existsPolicy);
      zoo.putPersistentData(zTablePath + Constants.ZTABLE_CONF, new byte[0], existsPolicy);
 -    zoo.putPersistentData(zTablePath + Constants.ZTABLE_NAMESPACE, namespaceId.getBytes(Constants.UTF8),
existsPolicy);
 -    zoo.putPersistentData(zTablePath + Constants.ZTABLE_NAME, tableName.getBytes(Constants.UTF8),
existsPolicy);
 +    zoo.putPersistentData(zTablePath + Constants.ZTABLE_NAMESPACE, namespaceId.getBytes(StandardCharsets.UTF_8),
existsPolicy);
 +    zoo.putPersistentData(zTablePath + Constants.ZTABLE_NAME, tableName.getBytes(StandardCharsets.UTF_8),
existsPolicy);
      zoo.putPersistentData(zTablePath + Constants.ZTABLE_FLUSH_ID, ZERO_BYTE, existsPolicy);
      zoo.putPersistentData(zTablePath + Constants.ZTABLE_COMPACT_ID, ZERO_BYTE, existsPolicy);
      zoo.putPersistentData(zTablePath + Constants.ZTABLE_COMPACT_CANCEL_ID, ZERO_BYTE, existsPolicy);
@@@ -133,7 -132,7 +133,7 @@@
      String statePath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZTABLES
+ "/" + tableId + Constants.ZTABLE_STATE;
  
      try {
-       ZooReaderWriter.getRetryingInstance().mutate(statePath, newState.name().getBytes(StandardCharsets.UTF_8),
ZooUtil.PUBLIC, new Mutator() {
 -      ZooReaderWriter.getInstance().mutate(statePath, newState.name().getBytes(Constants.UTF8),
ZooUtil.PUBLIC, new Mutator() {
++      ZooReaderWriter.getInstance().mutate(statePath, newState.name().getBytes(StandardCharsets.UTF_8),
ZooUtil.PUBLIC, new Mutator() {
          @Override
          public byte[] mutate(byte[] oldData) throws Exception {
            TableState oldState = TableState.UNKNOWN;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
index 1c1ee62,eabdc0d..38a96d4
--- a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
@@@ -49,11 -48,11 +49,11 @@@ public class UniqueNameAllocator 
        final int allocate = 100 + rand.nextInt(100);
        
        try {
-         byte[] max = ZooReaderWriter.getRetryingInstance().mutate(nextNamePath, null, ZooUtil.PRIVATE,
new ZooReaderWriter.Mutator() {
+         byte[] max = ZooReaderWriter.getInstance().mutate(nextNamePath, null, ZooUtil.PRIVATE,
new ZooReaderWriter.Mutator() {
            public byte[] mutate(byte[] currentValue) throws Exception {
 -            long l = Long.parseLong(new String(currentValue, Constants.UTF8), Character.MAX_RADIX);
 +            long l = Long.parseLong(new String(currentValue, StandardCharsets.UTF_8), Character.MAX_RADIX);
              l += allocate;
 -            return Long.toString(l, Character.MAX_RADIX).getBytes(Constants.UTF8);
 +            return Long.toString(l, Character.MAX_RADIX).getBytes(StandardCharsets.UTF_8);
            }
          });
          

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
index 53cf1c4,b02e5d4..9a968a9
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
@@@ -24,15 -24,15 +24,15 @@@ import org.apache.accumulo.fate.zookeep
  import org.apache.zookeeper.KeeperException;
  
  public class ZooQueueLock extends org.apache.accumulo.fate.zookeeper.ZooQueueLock {
-   
+ 
    public ZooQueueLock(String path, boolean ephemeral) throws KeeperException, InterruptedException
{
-     super(ZooReaderWriter.getRetryingInstance(), path, ephemeral);
+     super(ZooReaderWriter.getInstance(), path, ephemeral);
    }
-   
+ 
    public static void main(String args[]) throws InterruptedException, KeeperException {
      ZooQueueLock lock = new ZooQueueLock("/lock", true);
 -    DistributedReadWriteLock rlocker = new DistributedReadWriteLock(lock, "reader".getBytes(Constants.UTF8));
 -    DistributedReadWriteLock wlocker = new DistributedReadWriteLock(lock, "wlocker".getBytes(Constants.UTF8));
 +    DistributedReadWriteLock rlocker = new DistributedReadWriteLock(lock, "reader".getBytes(StandardCharsets.UTF_8));
 +    DistributedReadWriteLock wlocker = new DistributedReadWriteLock(lock, "wlocker".getBytes(StandardCharsets.UTF_8));
      final Lock readLock = rlocker.readLock();
      readLock.lock();
      final Lock readLock2 = rlocker.readLock();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
index 20afc90,435591d..e6d2350
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
@@@ -16,28 -16,18 +16,19 @@@
   */
  package org.apache.accumulo.server.zookeeper;
  
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import java.lang.reflect.Proxy;
 -import org.apache.accumulo.core.Constants;
 +import java.nio.charset.StandardCharsets;
 +
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.Property;
 -import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
- import org.apache.accumulo.fate.util.UtilWaitThread;
- import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
- import org.apache.log4j.Logger;
- import org.apache.zookeeper.KeeperException;
  
  public class ZooReaderWriter extends org.apache.accumulo.fate.zookeeper.ZooReaderWriter
{
    private static final String SCHEME = "digest";
    private static final String USER = "accumulo";
    private static ZooReaderWriter instance = null;
-   private static IZooReaderWriter retryingInstance = null;
    
    public ZooReaderWriter(String string, int timeInMillis, String secret) {
 -    super(string, timeInMillis, SCHEME, (USER + ":" + secret).getBytes(Constants.UTF8));
 +    super(string, timeInMillis, SCHEME, (USER + ":" + secret).getBytes(StandardCharsets.UTF_8));
    }
    
    public static synchronized ZooReaderWriter getInstance() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
index 8494682,4a7111d..e07c2b9
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
@@@ -16,16 -16,12 +16,13 @@@
   */
  package org.apache.accumulo.server.zookeeper;
  
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.Proxy;
 -import org.apache.accumulo.core.Constants;
 +import java.nio.charset.StandardCharsets;
 +
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
  import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
- import org.apache.accumulo.fate.zookeeper.RetryingInvocationHandler;
  import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 -import org.apache.accumulo.server.conf.ServerConfiguration;
  
  /**
   * A factory for {@link ZooReaderWriter} objects.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index b435b0f,f894afc..42495f4
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -1052,14 -997,13 +1052,14 @@@ public class Master implements LiveTSer
  
      try {
        final AgeOffStore<Master> store = new AgeOffStore<Master>(new org.apache.accumulo.fate.ZooStore<Master>(ZooUtil.getRoot(instance)
+ Constants.ZFATE,
-           ZooReaderWriter.getRetryingInstance()), 1000 * 60 * 60 * 8);
+           ZooReaderWriter.getInstance()), 1000 * 60 * 60 * 8);
  
 -      int threads = this.getConfiguration().getConfiguration().getCount(Property.MASTER_FATE_THREADPOOL_SIZE);
 +      int threads = getConfiguration().getCount(Property.MASTER_FATE_THREADPOOL_SIZE);
  
 -      fate = new Fate<Master>(this, store, threads);
 +      fate = new Fate<Master>(this, store);
 +      fate.startTransactionRunners(threads);
  
 -      SimpleTimer.getInstance().schedule(new Runnable() {
 +      SimpleTimer.getInstance(getConfiguration()).schedule(new Runnable() {
  
          @Override
          public void run() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
index c6ead8c,ebceaa0..eb49a35
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
@@@ -75,30 -73,30 +75,30 @@@ public class CancelCompactions extends 
      String zCompactID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+ Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;
      String zCancelID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+ Constants.ZTABLES + "/" + tableId
          + Constants.ZTABLE_COMPACT_CANCEL_ID;
-     
-     IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
-     
+ 
+     IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ 
      byte[] currentValue = zoo.getData(zCompactID, null);
-     
+ 
 -    String cvs = new String(currentValue, Constants.UTF8);
 +    String cvs = new String(currentValue, StandardCharsets.UTF_8);
      String[] tokens = cvs.split(",");
      final long flushID = Long.parseLong(tokens[0]);
-     
+ 
      zoo.mutate(zCancelID, null, null, new Mutator() {
        @Override
        public byte[] mutate(byte[] currentValue) throws Exception {
 -        long cid = Long.parseLong(new String(currentValue, Constants.UTF8));
 +        long cid = Long.parseLong(new String(currentValue, StandardCharsets.UTF_8));
-         
+ 
          if (cid < flushID)
 -          return Long.toString(flushID).getBytes(Constants.UTF8);
 +          return Long.toString(flushID).getBytes(StandardCharsets.UTF_8);
          else
 -          return Long.toString(cid).getBytes(Constants.UTF8);
 +          return Long.toString(cid).getBytes(StandardCharsets.UTF_8);
        }
      });
-     
+ 
      return new FinishCancelCompaction(tableId);
    }
-   
+ 
    @Override
    public void undo(long tid, Master environment) throws Exception {
      Utils.unreserveNamespace(namespaceId, tid, false);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
index a741085,7ede305..6961776
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
@@@ -118,9 -117,9 +118,9 @@@ public class Utils 
      Instance instance = HdfsZooInstance.getInstance();
  
      String resvPath = ZooUtil.getRoot(instance) + Constants.ZHDFS_RESERVATIONS + "/"
 -        + Base64.encodeBase64String(directory.getBytes(Constants.UTF8));
 +        + Base64.encodeBase64String(directory.getBytes(StandardCharsets.UTF_8));
  
-     IZooReaderWriter zk = ZooReaderWriter.getRetryingInstance();
+     IZooReaderWriter zk = ZooReaderWriter.getInstance();
  
      if (ZooReservation.attempt(zk, resvPath, String.format("%016x", tid), "")) {
        return 0;
@@@ -131,8 -130,8 +131,8 @@@
    public static void unreserveHdfsDirectory(String directory, long tid) throws KeeperException,
InterruptedException {
      Instance instance = HdfsZooInstance.getInstance();
      String resvPath = ZooUtil.getRoot(instance) + Constants.ZHDFS_RESERVATIONS + "/"
 -        + Base64.encodeBase64String(directory.getBytes(Constants.UTF8));
 +        + Base64.encodeBase64String(directory.getBytes(StandardCharsets.UTF_8));
-     ZooReservation.release(ZooReaderWriter.getRetryingInstance(), resvPath, String.format("%016x",
tid));
+     ZooReservation.release(ZooReaderWriter.getInstance(), resvPath, String.format("%016x",
tid));
    }
  
    private static Lock getLock(String tableId, long tid, boolean writeLock) throws Exception
{

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8bea7aba/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 78a2ed6,0000000..a84f092
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@@ -1,608 -1,0 +1,608 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.tablet;
 +
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
 +import java.util.TreeSet;
 +
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
 +import org.apache.accumulo.core.replication.StatusUtil;
 +import org.apache.accumulo.core.security.Credentials;
 +import org.apache.accumulo.core.util.MapCounter;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.fs.FileRef;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.util.MasterMetadataUtil;
 +import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.accumulo.server.util.ReplicationTableUtil;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.tserver.TLevel;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +
 +class DatafileManager {
 +  private final Logger log = Logger.getLogger(DatafileManager.class);
 +  // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles
 +  private final Map<FileRef,DataFileValue> datafileSizes = Collections.synchronizedMap(new
TreeMap<FileRef,DataFileValue>());
 +  private final Tablet tablet;
 +
 +  // ensure we only have one reader/writer of our bulk file notes at at time
 +  private final Object bulkFileImportLock = new Object();
 +
 +  DatafileManager(Tablet tablet, SortedMap<FileRef,DataFileValue> datafileSizes) {
 +    for (Entry<FileRef,DataFileValue> datafiles : datafileSizes.entrySet()) {
 +      this.datafileSizes.put(datafiles.getKey(), datafiles.getValue());
 +    }
 +    this.tablet = tablet;
 +  }
 +
 +  private FileRef mergingMinorCompactionFile = null;
 +  private final Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
 +  private final Map<Long,Set<FileRef>> scanFileReservations = new HashMap<Long,Set<FileRef>>();
 +  private final MapCounter<FileRef> fileScanReferenceCounts = new MapCounter<FileRef>();
 +  private long nextScanReservationId = 0;
 +  private boolean reservationsBlocked = false;
 +
 +  private final Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
 +
 +  static void rename(VolumeManager fs, Path src, Path dst) throws IOException {
 +    if (!fs.rename(src, dst)) {
 +      throw new IOException("Rename " + src + " to " + dst + " returned false ");
 +    }
 +  }
 +
 +  Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
 +    synchronized (tablet) {
 +
 +      while (reservationsBlocked) {
 +        try {
 +          tablet.wait(50);
 +        } catch (InterruptedException e) {
 +          log.warn(e, e);
 +        }
 +      }
 +
 +      Set<FileRef> absFilePaths = new HashSet<FileRef>(datafileSizes.keySet());
 +
 +      long rid = nextScanReservationId++;
 +
 +      scanFileReservations.put(rid, absFilePaths);
 +
 +      Map<FileRef,DataFileValue> ret = new HashMap<FileRef,DataFileValue>();
 +
 +      for (FileRef path : absFilePaths) {
 +        fileScanReferenceCounts.increment(path, 1);
 +        ret.put(path, datafileSizes.get(path));
 +      }
 +
 +      return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret);
 +    }
 +  }
 +
 +  void returnFilesForScan(Long reservationId) {
 +
 +    final Set<FileRef> filesToDelete = new HashSet<FileRef>();
 +
 +    synchronized (tablet) {
 +      Set<FileRef> absFilePaths = scanFileReservations.remove(reservationId);
 +
 +      if (absFilePaths == null)
 +        throw new IllegalArgumentException("Unknown scan reservation id " + reservationId);
 +
 +      boolean notify = false;
 +      for (FileRef path : absFilePaths) {
 +        long refCount = fileScanReferenceCounts.decrement(path, 1);
 +        if (refCount == 0) {
 +          if (filesToDeleteAfterScan.remove(path))
 +            filesToDelete.add(path);
 +          notify = true;
 +        } else if (refCount < 0)
 +          throw new IllegalStateException("Scan ref count for " + path + " is " + refCount);
 +      }
 +
 +      if (notify)
 +        tablet.notifyAll();
 +    }
 +
 +    if (filesToDelete.size() > 0) {
 +      log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete);
 +      MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(),
tablet.getTabletServer().getLock());
 +    }
 +  }
 +
 +  void removeFilesAfterScan(Set<FileRef> scanFiles) {
 +    if (scanFiles.size() == 0)
 +      return;
 +
 +    Set<FileRef> filesToDelete = new HashSet<FileRef>();
 +
 +    synchronized (tablet) {
 +      for (FileRef path : scanFiles) {
 +        if (fileScanReferenceCounts.get(path) == 0)
 +          filesToDelete.add(path);
 +        else
 +          filesToDeleteAfterScan.add(path);
 +      }
 +    }
 +
 +    if (filesToDelete.size() > 0) {
 +      log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete);
 +      MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(),
tablet.getTabletServer().getLock());
 +    }
 +  }
 +
 +  private TreeSet<FileRef> waitForScansToFinish(Set<FileRef> pathsToWaitFor,
boolean blockNewScans, long maxWaitTime) {
 +    long startTime = System.currentTimeMillis();
 +    TreeSet<FileRef> inUse = new TreeSet<FileRef>();
 +
 +    Span waitForScans = Trace.start("waitForScans");
 +    try {
 +      synchronized (tablet) {
 +        if (blockNewScans) {
 +          if (reservationsBlocked)
 +            throw new IllegalStateException();
 +
 +          reservationsBlocked = true;
 +        }
 +
 +        for (FileRef path : pathsToWaitFor) {
 +          while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis()
- startTime < maxWaitTime) {
 +            try {
 +              tablet.wait(100);
 +            } catch (InterruptedException e) {
 +              log.warn(e, e);
 +            }
 +          }
 +        }
 +
 +        for (FileRef path : pathsToWaitFor) {
 +          if (fileScanReferenceCounts.get(path) > 0)
 +            inUse.add(path);
 +        }
 +
 +        if (blockNewScans) {
 +          reservationsBlocked = false;
 +          tablet.notifyAll();
 +        }
 +
 +      }
 +    } finally {
 +      waitForScans.stop();
 +    }
 +    return inUse;
 +  }
 +
 +  public void importMapFiles(long tid, Map<FileRef,DataFileValue> pathsString, boolean
setTime) throws IOException {
 +
 +    final KeyExtent extent = tablet.getExtent();
 +    String bulkDir = null;
 +
 +    Map<FileRef,DataFileValue> paths = new HashMap<FileRef,DataFileValue>();
 +    for (Entry<FileRef,DataFileValue> entry : pathsString.entrySet())
 +      paths.put(entry.getKey(), entry.getValue());
 +
 +    for (FileRef tpath : paths.keySet()) {
 +
 +      boolean inTheRightDirectory = false;
 +      Path parent = tpath.path().getParent().getParent();
 +      for (String tablesDir : ServerConstants.getTablesDirs()) {
 +        if (parent.equals(new Path(tablesDir, tablet.getExtent().getTableId().toString())))
{
 +          inTheRightDirectory = true;
 +          break;
 +        }
 +      }
 +      if (!inTheRightDirectory) {
 +        throw new IOException("Data file " + tpath + " not in table dirs");
 +      }
 +
 +      if (bulkDir == null)
 +        bulkDir = tpath.path().getParent().toString();
 +      else if (!bulkDir.equals(tpath.path().getParent().toString()))
 +        throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + "
" + tpath);
 +
 +    }
 +
 +    if (tablet.getExtent().isRootTablet()) {
 +      throw new IllegalArgumentException("Can not import files to root tablet");
 +    }
 +
 +    synchronized (bulkFileImportLock) {
 +      Credentials creds = SystemCredentials.get();
 +      Connector conn;
 +      try {
 +        conn = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), creds.getToken());
 +      } catch (Exception ex) {
 +        throw new IOException(ex);
 +      }
 +      // Remove any bulk files we've previously loaded and compacted away
 +      List<FileRef> files = MetadataTableUtil.getBulkFilesLoaded(conn, extent, tid);
 +
 +      for (FileRef file : files)
 +        if (paths.keySet().remove(file))
 +          log.debug("Ignoring request to re-import a file already imported: " + extent +
": " + file);
 +
 +      if (paths.size() > 0) {
 +        long bulkTime = Long.MIN_VALUE;
 +        if (setTime) {
 +          for (DataFileValue dfv : paths.values()) {
 +            long nextTime = tablet.getAndUpdateTime();
 +            if (nextTime < bulkTime)
 +              throw new IllegalStateException("Time went backwards unexpectedly " + nextTime
+ " " + bulkTime);
 +            bulkTime = nextTime;
 +            dfv.setTime(bulkTime);
 +          }
 +        }
 +
 +        tablet.updatePersistedTime(bulkTime, paths, tid);
 +      }
 +    }
 +
 +    synchronized (tablet) {
 +      for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) {
 +        if (datafileSizes.containsKey(tpath.getKey())) {
 +          log.error("Adding file that is already in set " + tpath.getKey());
 +        }
 +        datafileSizes.put(tpath.getKey(), tpath.getValue());
 +
 +      }
 +
 +      tablet.getTabletResources().importedMapFiles();
 +
 +      tablet.computeNumEntries();
 +    }
 +
 +    for (Entry<FileRef,DataFileValue> entry : paths.entrySet()) {
 +      log.log(TLevel.TABLET_HIST, tablet.getExtent() + " import " + entry.getKey() + " "
+ entry.getValue());
 +    }
 +  }
 +
 +  FileRef reserveMergingMinorCompactionFile() {
 +    if (mergingMinorCompactionFile != null)
 +      throw new IllegalStateException("Tried to reserve merging minor compaction file when
already reserved  : " + mergingMinorCompactionFile);
 +
 +    if (tablet.getExtent().isRootTablet())
 +      return null;
 +
 +    int maxFiles = tablet.getTableConfiguration().getMaxFilesPerTablet();
 +
 +    // when a major compaction is running and we are at max files, write out
 +    // one extra file... want to avoid the case where major compaction is
 +    // compacting everything except for the largest file, and therefore the
 +    // largest file is returned for merging.. the following check mostly
 +    // avoids this case, except for the case where major compactions fail or
 +    // are canceled
 +    if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles)
 +      return null;
 +
 +    if (datafileSizes.size() >= maxFiles) {
 +      // find the smallest file
 +
 +      long min = Long.MAX_VALUE;
 +      FileRef minName = null;
 +
 +      for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
 +        if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey()))
{
 +          min = entry.getValue().getSize();
 +          minName = entry.getKey();
 +        }
 +      }
 +
 +      if (minName == null)
 +        return null;
 +
 +      mergingMinorCompactionFile = minName;
 +      return minName;
 +    }
 +
 +    return null;
 +  }
 +
 +  void unreserveMergingMinorCompactionFile(FileRef file) {
 +    if ((file == null && mergingMinorCompactionFile != null) || (file != null &&
mergingMinorCompactionFile == null)
 +        || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile)))
 +      throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile);
 +
 +    mergingMinorCompactionFile = null;
 +  }
 +
 +  void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile,
DataFileValue dfv, CommitSession commitSession, long flushId)
 +      throws IOException {
 +
-     IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
++    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +    if (tablet.getExtent().isRootTablet()) {
 +      try {
 +        if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) {
 +          throw new IllegalStateException();
 +        }
 +      } catch (Exception e) {
 +        throw new IllegalStateException("Can not bring major compaction online, lock not
held", e);
 +      }
 +    }
 +
 +    // rename before putting in metadata table, so files in metadata table should
 +    // always exist
 +    do {
 +      try {
 +        if (dfv.getNumEntries() == 0) {
 +          tablet.getTabletServer().getFileSystem().deleteRecursively(tmpDatafile.path());
 +        } else {
 +          if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) {
 +            log.warn("Target map file already exist " + newDatafile);
 +            tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path());
 +          }
 +
 +          rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path());
 +        }
 +        break;
 +      } catch (IOException ioe) {
 +        log.warn("Tablet " + tablet.getExtent() + " failed to rename " + newDatafile + "
after MinC, will retry in 60 secs...", ioe);
 +        UtilWaitThread.sleep(60 * 1000);
 +      }
 +    } while (true);
 +
 +    long t1, t2;
 +
 +    // the code below always assumes merged files are in use by scans... this must be done
 +    // because the in memory list of files is not updated until after the metadata table
 +    // therefore the file is available to scans until memory is updated, but want to ensure
 +    // the file is not available for garbage collection... if memory were updated
 +    // before this point (like major compactions do), then the following code could wait
 +    // for scans to finish like major compactions do.... used to wait for scans to finish
 +    // here, but that was incorrect because a scan could start after waiting but before
 +    // memory was updated... assuming the file is always in use by scans leads to
 +    // one uneeded metadata update when it was not actually in use
 +    Set<FileRef> filesInUseByScans = Collections.emptySet();
 +    if (absMergeFile != null)
 +      filesInUseByScans = Collections.singleton(absMergeFile);
 +
 +    // very important to write delete entries outside of log lock, because
 +    // this metadata write does not go up... it goes sideways or to itself
 +    if (absMergeFile != null)
 +      MetadataTableUtil.addDeleteEntries(tablet.getExtent(), Collections.singleton(absMergeFile),
SystemCredentials.get());
 +
 +    Set<String> unusedWalLogs = tablet.beginClearingUnusedLogs();
 +    boolean replicate = ReplicationConfigurationUtil.isEnabled(tablet.getExtent(), tablet.getTableConfiguration());
 +    Set<String> logFileOnly = null;
 +    if (replicate) {
 +      // unusedWalLogs is of the form host/fileURI, need to strip off the host portion
 +      logFileOnly = new HashSet<>();
 +      for (String unusedWalLog : unusedWalLogs) {
 +        int index = unusedWalLog.indexOf('/');
 +        if (-1 == index) {
 +          log.warn("Could not find host component to strip from DFSLogger representation
of WAL");
 +        } else {
 +          unusedWalLog = unusedWalLog.substring(index + 1);
 +        }
 +        logFileOnly.add(unusedWalLog);
 +      }
 +    }
 +    try {
 +      // the order of writing to metadata and walog is important in the face of machine/process
failures
 +      // need to write to metadata before writing to walog, when things are done in the
reverse order
 +      // data could be lost... the minor compaction start even should be written before
the following metadata
 +      // write is made
 +
 +      tablet.updateTabletDataFile(commitSession.getMaxCommittedTime(), newDatafile, absMergeFile,
dfv, unusedWalLogs, filesInUseByScans, flushId);
 +
 +      // Mark that we have data we want to replicate
 +      // This WAL could still be in use by other Tablets *from the same table*, so we can
only mark that there is data to replicate,
 +      // but it is *not* closed
 +      if (replicate) {
 +        if (log.isDebugEnabled()) {
 +          log.debug("Recording that data has been ingested into " + tablet.getExtent() +
" using " + logFileOnly);
 +        }
 +        ReplicationTableUtil.updateFiles(SystemCredentials.get(), tablet.getExtent(), logFileOnly,
StatusUtil.openWithUnknownLength());
 +      }
 +    } finally {
 +      tablet.finishClearingUnusedLogs();
 +    }
 +
 +    do {
 +      try {
 +        // the purpose of making this update use the new commit session, instead of the
old one passed in,
 +        // is because the new one will reference the logs used by current memory...
 +
 +        tablet.getTabletServer().minorCompactionFinished(tablet.getTabletMemory().getCommitSession(),
newDatafile.toString(), commitSession.getWALogSeq() + 2);
 +        break;
 +      } catch (IOException e) {
 +        log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry",
e);
 +        UtilWaitThread.sleep(1 * 1000);
 +      }
 +    } while (true);
 +
 +    synchronized (tablet) {
 +      t1 = System.currentTimeMillis();
 +
 +      if (datafileSizes.containsKey(newDatafile)) {
 +        log.error("Adding file that is already in set " + newDatafile);
 +      }
 +
 +      if (dfv.getNumEntries() > 0) {
 +        datafileSizes.put(newDatafile, dfv);
 +      }
 +
 +      if (absMergeFile != null) {
 +        datafileSizes.remove(absMergeFile);
 +      }
 +
 +      unreserveMergingMinorCompactionFile(absMergeFile);
 +
 +      tablet.flushComplete(flushId);
 +
 +      t2 = System.currentTimeMillis();
 +    }
 +
 +    // must do this after list of files in memory is updated above
 +    removeFilesAfterScan(filesInUseByScans);
 +
 +    if (absMergeFile != null)
 +      log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [" + absMergeFile + ",memory]
-> " + newDatafile);
 +    else
 +      log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [memory] -> " + newDatafile);
 +    log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, tablet.getExtent().toString()));
 +    long splitSize = tablet.getTableConfiguration().getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD);
 +    if (dfv.getSize() > splitSize) {
 +      log.debug(String.format("Minor Compaction wrote out file larger than split threshold.
 split threshold = %,d  file size = %,d", splitSize, dfv.getSize()));
 +    }
 +  }
 +
 +  public void reserveMajorCompactingFiles(Collection<FileRef> files) {
 +    if (majorCompactingFiles.size() != 0)
 +      throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles);
 +
 +    if (mergingMinorCompactionFile != null && files.contains(mergingMinorCompactionFile))
 +      throw new IllegalStateException("Major compaction tried to resrve file in use by minor
compaction " + mergingMinorCompactionFile);
 +
 +    majorCompactingFiles.addAll(files);
 +  }
 +
 +  public void clearMajorCompactingFile() {
 +    majorCompactingFiles.clear();
 +  }
 +
 +  void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef tmpDatafile,
FileRef newDatafile, Long compactionId, DataFileValue dfv)
 +      throws IOException {
 +    final KeyExtent extent = tablet.getExtent();
 +    long t1, t2;
 +
 +    if (!extent.isRootTablet()) {
 +
 +      if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) {
 +        log.error("Target map file already exist " + newDatafile, new Exception());
 +        throw new IllegalStateException("Target map file already exist " + newDatafile);
 +      }
 +
 +      // rename before putting in metadata table, so files in metadata table should
 +      // always exist
 +      rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path());
 +
 +      if (dfv.getNumEntries() == 0) {
 +        tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path());
 +      }
 +    }
 +
 +    TServerInstance lastLocation = null;
 +    synchronized (tablet) {
 +
 +      t1 = System.currentTimeMillis();
 +
-       IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
++      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +
 +      tablet.incrementDataSourceDeletions();
 +
 +      if (extent.isRootTablet()) {
 +
 +        waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
 +
 +        try {
 +          if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) {
 +            throw new IllegalStateException();
 +          }
 +        } catch (Exception e) {
 +          throw new IllegalStateException("Can not bring major compaction online, lock not
held", e);
 +        }
 +
 +        // mark files as ready for deletion, but
 +        // do not delete them until we successfully
 +        // rename the compacted map file, in case
 +        // the system goes down
 +
 +        RootFiles.replaceFiles(tablet.getTableConfiguration(), tablet.getTabletServer().getFileSystem(),
tablet.getLocation(), oldDatafiles, tmpDatafile, newDatafile);
 +      }
 +
 +      // atomically remove old files and add new file
 +      for (FileRef oldDatafile : oldDatafiles) {
 +        if (!datafileSizes.containsKey(oldDatafile)) {
 +          log.error("file does not exist in set " + oldDatafile);
 +        }
 +        datafileSizes.remove(oldDatafile);
 +        majorCompactingFiles.remove(oldDatafile);
 +      }
 +
 +      if (datafileSizes.containsKey(newDatafile)) {
 +        log.error("Adding file that is already in set " + newDatafile);
 +      }
 +
 +      if (dfv.getNumEntries() > 0) {
 +        datafileSizes.put(newDatafile, dfv);
 +      }
 +
 +      // could be used by a follow on compaction in a multipass compaction
 +      majorCompactingFiles.add(newDatafile);
 +
 +      tablet.computeNumEntries();
 +
 +      lastLocation = tablet.resetLastLocation();
 +
 +      tablet.setLastCompactionID(compactionId);
 +      t2 = System.currentTimeMillis();
 +    }
 +
 +    if (!extent.isRootTablet()) {
 +      Set<FileRef> filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000);
 +      if (filesInUseByScans.size() > 0)
 +        log.debug("Adding scan refs to metadata " + extent + " " + filesInUseByScans);
 +      MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile,
compactionId, dfv, SystemCredentials.get(),
 +          tablet.getTabletServer().getClientAddressString(), lastLocation, tablet.getTabletServer().getLock());
 +      removeFilesAfterScan(filesInUseByScans);
 +    }
 +
 +    log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0));
 +    log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + " --> " + newDatafile);
 +  }
 +
 +  public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
 +    synchronized (tablet) {
 +      TreeMap<FileRef,DataFileValue> copy = new TreeMap<FileRef,DataFileValue>(datafileSizes);
 +      return Collections.unmodifiableSortedMap(copy);
 +    }
 +  }
 +
 +  public Set<FileRef> getFiles() {
 +    synchronized (tablet) {
 +      HashSet<FileRef> files = new HashSet<FileRef>(datafileSizes.keySet());
 +      return Collections.unmodifiableSet(files);
 +    }
 +  }
 +
 +  public int getNumFiles() {
 +    return datafileSizes.size();
 +  }
 +
 +}


Mime
View raw message