accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] 01/02: Merge branch '1.9'
Date Wed, 02 Jan 2019 22:05:28 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit c0f9e322111c8f6bcbfcaf1c67160756153b63a5
Merge: dbc7444 2c2840b
Author: Keith Turner <kturner@apache.org>
AuthorDate: Wed Jan 2 16:53:01 2019 -0500

    Merge branch '1.9'

 .../core/client/admin/NewTableConfiguration.java   |   8 +
 .../core/clientImpl/InstanceOperationsImpl.java    |  19 ++
 .../core/clientImpl/NamespaceOperationsImpl.java   |  21 ++
 .../core/clientImpl/TableOperationsImpl.java       |  47 +++-
 .../accumulo/core/util/LocalityGroupUtil.java      |  45 +++-
 .../server/client/ClientServiceHandler.java        |  16 +-
 .../accumulo/server/log/WalStateManager.java       |  24 +-
 .../master/tableOps/bulkVer1/BulkImport.java       |  17 +-
 .../org/apache/accumulo/tserver/InMemoryMap.java   |   7 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  97 +++++---
 .../org/apache/accumulo/tserver/log/DfsLogger.java |  15 +-
 .../accumulo/tserver/log/TabletServerLogger.java   |   1 -
 .../apache/accumulo/tserver/tablet/Compactor.java  |  16 +-
 .../accumulo/tserver/tablet/MinorCompactor.java    |  10 +
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  36 ++-
 .../accumulo/tserver/tablet/TabletMemory.java      |  15 +-
 .../apache/accumulo/tserver/InMemoryMapTest.java   |  14 +-
 .../org/apache/accumulo/test/InMemoryMapIT.java    |  15 +-
 .../test/functional/BadLocalityGroupMincIT.java    |  91 ++++++++
 .../accumulo/test/functional/BulkFailureIT.java    | 254 +++++++++++++++++++++
 .../apache/accumulo/test/functional/BulkIT.java    |   2 -
 .../test/functional/ManyWriteAheadLogsIT.java      | 186 +++++++++++++++
 .../accumulo/test/functional/RowDeleteIT.java      |   1 -
 .../accumulo/test/functional/WALSunnyDayIT.java    |  58 +++--
 24 files changed, 882 insertions(+), 133 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
index 1f3335d,fc60b0c..defd18a
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
@@@ -19,33 -19,17 +19,34 @@@ package org.apache.accumulo.core.client
  import static com.google.common.base.Preconditions.checkArgument;
  import static java.util.Objects.requireNonNull;
  
 +import java.util.Arrays;
 +import java.util.Collection;
  import java.util.Collections;
 +import java.util.EnumSet;
  import java.util.HashMap;
  import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Objects;
 +import java.util.Set;
 +import java.util.SortedSet;
  
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.IteratorSetting;
  import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 +import org.apache.accumulo.core.client.summary.Summarizer;
 +import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 +import org.apache.accumulo.core.clientImpl.TableOperationsHelper;
 +import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.iterators.IteratorUtil;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
  import org.apache.accumulo.core.iterators.user.VersioningIterator;
  import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 +import org.apache.accumulo.core.summary.SummarizerConfigurationUtil;
  import org.apache.accumulo.core.util.LocalityGroupUtil;
+ import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 -import org.slf4j.LoggerFactory;
 +import org.apache.hadoop.io.Text;
 +
 +import com.google.common.collect.ImmutableSortedSet;
  
  /**
   * This object stores table creation parameters. Currently includes: {@link TimeType}, whether to
@@@ -141,14 -90,20 +142,21 @@@ public class NewTableConfiguration 
     *          additional properties to add to the table when it is created
     * @return this
     */
 -  public NewTableConfiguration setProperties(Map<String,String> prop) {
 -    checkArgument(prop != null, "properties is null");
 -    SamplerConfigurationImpl.checkDisjoint(prop, samplerConfiguration);
 +  public NewTableConfiguration setProperties(Map<String,String> props) {
 +    checkArgument(props != null, "properties is null");
 +    checkDisjoint(props, samplerProps, "sampler");
 +    checkDisjoint(props, summarizerProps, "summarizer");
 +    checkDisjoint(props, localityProps, "locality group");
 +    checkDisjoint(props, iteratorProps, "iterator");
 +    checkTableProperties(props);
+ 
+     try {
 -      LocalityGroupUtil.checkLocalityGroups(prop.entrySet());
++      LocalityGroupUtil.checkLocalityGroups(props.entrySet());
+     } catch (LocalityGroupConfigurationError e) {
 -      LoggerFactory.getLogger(NewTableConfiguration.class).warn(
 -          "Setting new table properties with bad locality group config.   Even though this warning"
 -              + " was displayed, the properties were set. props:" + prop,
 -          e);
++      throw new IllegalArgumentException(e);
+     }
+ 
 -    this.properties = new HashMap<>(prop);
 +    this.properties = new HashMap<>(props);
      return this;
    }
  
diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index 9f4bd58,0000000..5813f27
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@@ -1,216 -1,0 +1,235 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.clientImpl;
 +
 +import static com.google.common.base.Preconditions.checkArgument;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.UUID;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.ActiveCompaction;
 +import org.apache.accumulo.core.client.admin.ActiveScan;
 +import org.apache.accumulo.core.client.admin.InstanceOperations;
 +import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 +import org.apache.accumulo.core.trace.Tracer;
 +import org.apache.accumulo.core.util.AddressUtil;
 +import org.apache.accumulo.core.util.HostAndPort;
++import org.apache.accumulo.core.util.LocalityGroupUtil;
++import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 +import org.apache.accumulo.fate.zookeeper.ZooCache;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
++import org.slf4j.LoggerFactory;
 +
 +/**
 + * Provides a class for administering the accumulo instance
 + */
 +public class InstanceOperationsImpl implements InstanceOperations {
 +  private final ClientContext context;
 +
 +  public InstanceOperationsImpl(ClientContext context) {
 +    checkArgument(context != null, "context is null");
 +    this.context = context;
 +  }
 +
 +  @Override
 +  public void setProperty(final String property, final String value)
 +      throws AccumuloException, AccumuloSecurityException, IllegalArgumentException {
 +    checkArgument(property != null, "property is null");
 +    checkArgument(value != null, "value is null");
 +    MasterClient.executeVoid(context, client -> client.setSystemProperty(Tracer.traceInfo(),
 +        context.rpcCreds(), property, value));
++    checkLocalityGroups(property);
 +  }
 +
 +  @Override
 +  public void removeProperty(final String property)
 +      throws AccumuloException, AccumuloSecurityException {
 +    checkArgument(property != null, "property is null");
 +    MasterClient.executeVoid(context,
 +        client -> client.removeSystemProperty(Tracer.traceInfo(), context.rpcCreds(), property));
++    checkLocalityGroups(property);
++  }
++
++  void checkLocalityGroups(String propChanged) throws AccumuloSecurityException, AccumuloException {
++    if (LocalityGroupUtil.isLocalityGroupProperty(propChanged)) {
++      try {
++        LocalityGroupUtil.checkLocalityGroups(getSystemConfiguration().entrySet());
++      } catch (LocalityGroupConfigurationError | RuntimeException e) {
++        LoggerFactory.getLogger(this.getClass()).warn("Changing '" + propChanged
++            + "' resulted in bad locality group config. This may be a transient situation since "
++            + "the config spreads over multiple properties. Setting properties in a different "
++            + "order may help. Even though this warning was displayed, the property was updated. "
++            + "Please check your config to ensure consistency.", e);
++      }
++    }
 +  }
 +
 +  @Override
 +  public Map<String,String> getSystemConfiguration()
 +      throws AccumuloException, AccumuloSecurityException {
 +    return ServerClient.execute(context, client -> client.getConfiguration(Tracer.traceInfo(),
 +        context.rpcCreds(), ConfigurationType.CURRENT));
 +  }
 +
 +  @Override
 +  public Map<String,String> getSiteConfiguration()
 +      throws AccumuloException, AccumuloSecurityException {
 +    return ServerClient.execute(context, client -> client.getConfiguration(Tracer.traceInfo(),
 +        context.rpcCreds(), ConfigurationType.SITE));
 +  }
 +
 +  @Override
 +  public List<String> getTabletServers() {
 +    ZooCache cache = context.getZooCache();
 +    String path = context.getZooKeeperRoot() + Constants.ZTSERVERS;
 +    List<String> results = new ArrayList<>();
 +    for (String candidate : cache.getChildren(path)) {
 +      List<String> children = cache.getChildren(path + "/" + candidate);
 +      if (children != null && children.size() > 0) {
 +        List<String> copy = new ArrayList<>(children);
 +        Collections.sort(copy);
 +        byte[] data = cache.get(path + "/" + candidate + "/" + copy.get(0));
 +        if (data != null && !"master".equals(new String(data, UTF_8))) {
 +          results.add(candidate);
 +        }
 +      }
 +    }
 +    return results;
 +  }
 +
 +  @Override
 +  public List<ActiveScan> getActiveScans(String tserver)
 +      throws AccumuloException, AccumuloSecurityException {
 +    final HostAndPort parsedTserver = HostAndPort.fromString(tserver);
 +    Client client = null;
 +    try {
 +      client = ThriftUtil.getTServerClient(parsedTserver, context);
 +
 +      List<ActiveScan> as = new ArrayList<>();
 +      for (org.apache.accumulo.core.tabletserver.thrift.ActiveScan activeScan : client
 +          .getActiveScans(Tracer.traceInfo(), context.rpcCreds())) {
 +        try {
 +          as.add(new ActiveScanImpl(context, activeScan));
 +        } catch (TableNotFoundException e) {
 +          throw new AccumuloException(e);
 +        }
 +      }
 +      return as;
 +    } catch (TTransportException e) {
 +      throw new AccumuloException(e);
 +    } catch (ThriftSecurityException e) {
 +      throw new AccumuloSecurityException(e.user, e.code, e);
 +    } catch (TException e) {
 +      throw new AccumuloException(e);
 +    } finally {
 +      if (client != null)
 +        ThriftUtil.returnClient(client);
 +    }
 +  }
 +
 +  @Override
 +  public boolean testClassLoad(final String className, final String asTypeName)
 +      throws AccumuloException, AccumuloSecurityException {
 +    return ServerClient.execute(context,
 +        client -> client.checkClass(Tracer.traceInfo(), context.rpcCreds(), className, asTypeName));
 +  }
 +
 +  @Override
 +  public List<ActiveCompaction> getActiveCompactions(String tserver)
 +      throws AccumuloException, AccumuloSecurityException {
 +    final HostAndPort parsedTserver = HostAndPort.fromString(tserver);
 +    Client client = null;
 +    try {
 +      client = ThriftUtil.getTServerClient(parsedTserver, context);
 +
 +      List<ActiveCompaction> as = new ArrayList<>();
 +      for (org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction activeCompaction : client
 +          .getActiveCompactions(Tracer.traceInfo(), context.rpcCreds())) {
 +        as.add(new ActiveCompactionImpl(context, activeCompaction));
 +      }
 +      return as;
 +    } catch (TTransportException e) {
 +      throw new AccumuloException(e);
 +    } catch (ThriftSecurityException e) {
 +      throw new AccumuloSecurityException(e.user, e.code, e);
 +    } catch (TException e) {
 +      throw new AccumuloException(e);
 +    } finally {
 +      if (client != null)
 +        ThriftUtil.returnClient(client);
 +    }
 +  }
 +
 +  @Override
 +  public void ping(String tserver) throws AccumuloException {
 +    TTransport transport = null;
 +    try {
 +      transport = ThriftUtil.createTransport(AddressUtil.parseAddress(tserver, false), context);
 +      TabletClientService.Client client = ThriftUtil
 +          .createClient(new TabletClientService.Client.Factory(), transport);
 +      client.getTabletServerStatus(Tracer.traceInfo(), context.rpcCreds());
 +    } catch (TException e) {
 +      throw new AccumuloException(e);
 +    } finally {
 +      if (transport != null) {
 +        transport.close();
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void waitForBalance() throws AccumuloException {
 +    try {
 +      MasterClient.executeVoid(context, client -> client.waitForBalance(Tracer.traceInfo()));
 +    } catch (AccumuloSecurityException ex) {
 +      // should never happen
 +      throw new RuntimeException("Unexpected exception thrown", ex);
 +    }
 +
 +  }
 +
 +  /**
 +   * Given a zooCache and instanceId, look up the instance name.
 +   */
 +  public static String lookupInstanceName(ZooCache zooCache, UUID instanceId) {
 +    checkArgument(zooCache != null, "zooCache is null");
 +    checkArgument(instanceId != null, "instanceId is null");
 +    for (String name : zooCache.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) {
 +      String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + name;
 +      byte[] bytes = zooCache.get(instanceNamePath);
 +      UUID iid = UUID.fromString(new String(bytes, UTF_8));
 +      if (iid.equals(instanceId)) {
 +        return name;
 +      }
 +    }
 +    return null;
 +  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java
index cf6f92a,0000000..041a960
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java
@@@ -1,273 -1,0 +1,294 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.clientImpl;
 +
 +import static com.google.common.base.Preconditions.checkArgument;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.SortedSet;
 +import java.util.TreeMap;
 +import java.util.TreeSet;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.NamespaceExistsException;
 +import org.apache.accumulo.core.client.NamespaceNotEmptyException;
 +import org.apache.accumulo.core.client.NamespaceNotFoundException;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.constraints.Constraint;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.master.thrift.FateOperation;
 +import org.apache.accumulo.core.trace.Tracer;
++import org.apache.accumulo.core.util.LocalityGroupUtil;
++import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 +import org.apache.accumulo.core.util.OpTimer;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class NamespaceOperationsImpl extends NamespaceOperationsHelper {
 +  private final ClientContext context;
 +  private TableOperationsImpl tableOps;
 +
 +  private static final Logger log = LoggerFactory.getLogger(TableOperations.class);
 +
 +  public NamespaceOperationsImpl(ClientContext context, TableOperationsImpl tableOps) {
 +    checkArgument(context != null, "context is null");
 +    this.context = context;
 +    this.tableOps = tableOps;
 +  }
 +
 +  @Override
 +  public SortedSet<String> list() {
 +
 +    OpTimer timer = null;
 +
 +    if (log.isTraceEnabled()) {
 +      log.trace("tid={} Fetching list of namespaces...", Thread.currentThread().getId());
 +      timer = new OpTimer().start();
 +    }
 +
 +    TreeSet<String> namespaces = new TreeSet<>(Namespaces.getNameToIdMap(context).keySet());
 +
 +    if (timer != null) {
 +      timer.stop();
 +      log.trace("tid={} Fetched {} namespaces in {}", Thread.currentThread().getId(),
 +          namespaces.size(), String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
 +    }
 +
 +    return namespaces;
 +  }
 +
 +  @Override
 +  public boolean exists(String namespace) {
 +    checkArgument(namespace != null, "namespace is null");
 +
 +    OpTimer timer = null;
 +
 +    if (log.isTraceEnabled()) {
 +      log.trace("tid={} Checking if namespace {} exists", Thread.currentThread().getId(),
 +          namespace);
 +      timer = new OpTimer().start();
 +    }
 +
 +    boolean exists = Namespaces.namespaceNameExists(context, namespace);
 +
 +    if (timer != null) {
 +      timer.stop();
 +      log.trace("tid={} Checked existance of {} in {}", Thread.currentThread().getId(), exists,
 +          String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
 +    }
 +
 +    return exists;
 +  }
 +
 +  @Override
 +  public void create(String namespace)
 +      throws AccumuloException, AccumuloSecurityException, NamespaceExistsException {
 +    checkArgument(namespace != null, "namespace is null");
 +
 +    try {
 +      doNamespaceFateOperation(FateOperation.NAMESPACE_CREATE,
 +          Arrays.asList(ByteBuffer.wrap(namespace.getBytes(UTF_8))), Collections.emptyMap(),
 +          namespace);
 +    } catch (NamespaceNotFoundException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +  }
 +
 +  @Override
 +  public void delete(String namespace) throws AccumuloException, AccumuloSecurityException,
 +      NamespaceNotFoundException, NamespaceNotEmptyException {
 +    checkArgument(namespace != null, "namespace is null");
 +    Namespace.ID namespaceId = Namespaces.getNamespaceId(context, namespace);
 +
 +    if (namespaceId.equals(Namespace.ID.ACCUMULO) || namespaceId.equals(Namespace.ID.DEFAULT)) {
 +      Credentials credentials = context.getCredentials();
 +      log.debug("{} attempted to delete the {} namespace", credentials.getPrincipal(), namespaceId);
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.UNSUPPORTED_OPERATION);
 +    }
 +
 +    if (Namespaces.getTableIds(context, namespaceId).size() > 0) {
 +      throw new NamespaceNotEmptyException(namespaceId.canonicalID(), namespace, null);
 +    }
 +
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(namespace.getBytes(UTF_8)));
 +    Map<String,String> opts = new HashMap<>();
 +
 +    try {
 +      doNamespaceFateOperation(FateOperation.NAMESPACE_DELETE, args, opts, namespace);
 +    } catch (NamespaceExistsException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +
 +  }
 +
 +  @Override
 +  public void rename(String oldNamespaceName, String newNamespaceName)
 +      throws AccumuloSecurityException, NamespaceNotFoundException, AccumuloException,
 +      NamespaceExistsException {
 +
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldNamespaceName.getBytes(UTF_8)),
 +        ByteBuffer.wrap(newNamespaceName.getBytes(UTF_8)));
 +    Map<String,String> opts = new HashMap<>();
 +    doNamespaceFateOperation(FateOperation.NAMESPACE_RENAME, args, opts, oldNamespaceName);
 +  }
 +
 +  @Override
 +  public void setProperty(final String namespace, final String property, final String value)
 +      throws AccumuloException, AccumuloSecurityException, NamespaceNotFoundException {
 +    checkArgument(namespace != null, "namespace is null");
 +    checkArgument(property != null, "property is null");
 +    checkArgument(value != null, "value is null");
 +
 +    MasterClient.executeNamespace(context, client -> client.setNamespaceProperty(Tracer.traceInfo(),
 +        context.rpcCreds(), namespace, property, value));
++    checkLocalityGroups(namespace, property);
 +  }
 +
 +  @Override
 +  public void removeProperty(final String namespace, final String property)
 +      throws AccumuloException, AccumuloSecurityException, NamespaceNotFoundException {
 +    checkArgument(namespace != null, "namespace is null");
 +    checkArgument(property != null, "property is null");
 +
 +    MasterClient.executeNamespace(context, client -> client
 +        .removeNamespaceProperty(Tracer.traceInfo(), context.rpcCreds(), namespace, property));
++    checkLocalityGroups(namespace, property);
 +  }
 +
 +  @Override
 +  public Iterable<Entry<String,String>> getProperties(final String namespace)
 +      throws AccumuloException, NamespaceNotFoundException {
 +    checkArgument(namespace != null, "namespace is null");
 +    try {
 +      return ServerClient.executeRaw(context, client -> client
 +          .getNamespaceConfiguration(Tracer.traceInfo(), context.rpcCreds(), namespace)).entrySet();
 +    } catch (ThriftTableOperationException e) {
 +      switch (e.getType()) {
 +        case NAMESPACE_NOTFOUND:
 +          throw new NamespaceNotFoundException(e);
 +        case OTHER:
 +        default:
 +          throw new AccumuloException(e.description, e);
 +      }
 +    } catch (AccumuloException e) {
 +      throw e;
 +    } catch (Exception e) {
 +      throw new AccumuloException(e);
 +    }
 +
 +  }
 +
 +  @Override
 +  public Map<String,String> namespaceIdMap() {
 +    return Namespaces.getNameToIdMap(context).entrySet().stream()
 +        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().canonicalID(), (v1, v2) -> {
 +          throw new RuntimeException(String.format("Duplicate key for values %s and %s", v1, v2));
 +        }, TreeMap::new));
 +  }
 +
 +  @Override
 +  public boolean testClassLoad(final String namespace, final String className,
 +      final String asTypeName)
 +      throws NamespaceNotFoundException, AccumuloException, AccumuloSecurityException {
 +    checkArgument(namespace != null, "namespace is null");
 +    checkArgument(className != null, "className is null");
 +    checkArgument(asTypeName != null, "asTypeName is null");
 +
 +    try {
 +      return ServerClient.executeRaw(context,
 +          client -> client.checkNamespaceClass(Tracer.traceInfo(), context.rpcCreds(), namespace,
 +              className, asTypeName));
 +    } catch (ThriftTableOperationException e) {
 +      switch (e.getType()) {
 +        case NAMESPACE_NOTFOUND:
 +          throw new NamespaceNotFoundException(e);
 +        default:
 +          throw new AccumuloException(e.description, e);
 +      }
 +    } catch (ThriftSecurityException e) {
 +      throw new AccumuloSecurityException(e.user, e.code, e);
 +    } catch (AccumuloException e) {
 +      throw e;
 +    } catch (Exception e) {
 +      throw new AccumuloException(e);
 +    }
 +  }
 +
 +  @Override
 +  public void attachIterator(String namespace, IteratorSetting setting,
 +      EnumSet<IteratorScope> scopes)
 +      throws AccumuloSecurityException, AccumuloException, NamespaceNotFoundException {
 +    testClassLoad(namespace, setting.getIteratorClass(), SortedKeyValueIterator.class.getName());
 +    super.attachIterator(namespace, setting, scopes);
 +  }
 +
 +  @Override
 +  public int addConstraint(String namespace, String constraintClassName)
 +      throws AccumuloException, AccumuloSecurityException, NamespaceNotFoundException {
 +    testClassLoad(namespace, constraintClassName, Constraint.class.getName());
 +    return super.addConstraint(namespace, constraintClassName);
 +  }
 +
 +  private String doNamespaceFateOperation(FateOperation op, List<ByteBuffer> args,
 +      Map<String,String> opts, String namespace) throws AccumuloSecurityException,
 +      AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
 +    try {
 +      return tableOps.doFateOperation(op, args, opts, namespace);
 +    } catch (TableExistsException | TableNotFoundException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +  }
++
++  private void checkLocalityGroups(String namespace, String propChanged)
++      throws AccumuloException, NamespaceNotFoundException {
++    if (LocalityGroupUtil.isLocalityGroupProperty(propChanged)) {
++      Iterable<Entry<String,String>> allProps = getProperties(namespace);
++      try {
++        LocalityGroupUtil.checkLocalityGroups(allProps);
++      } catch (LocalityGroupConfigurationError | RuntimeException e) {
++        LoggerFactory.getLogger(this.getClass()).warn("Changing '" + propChanged
++            + "' for namespace '" + namespace
++            + "'resulted in bad locality group config. This may be a transient situation since the"
++            + " config spreads over multiple properties. Setting properties in a different order "
++            + "may help. Even though this warning was displayed, the property was updated. Please "
++            + "check your config to ensure consistency.", e);
++      }
++    }
++  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 853b8b1,0000000..4ce797f
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@@ -1,1873 -1,0 +1,1906 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.clientImpl;
 +
 +import static com.google.common.base.Preconditions.checkArgument;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +import static java.util.Objects.requireNonNull;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static java.util.stream.Collectors.toSet;
 +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 +
 +import java.io.BufferedReader;
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.InputStreamReader;
 +import java.nio.ByteBuffer;
 +import java.security.SecureRandom;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Objects;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeMap;
 +import java.util.TreeSet;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.Predicate;
 +import java.util.regex.Pattern;
 +import java.util.stream.Collectors;
 +import java.util.zip.ZipEntry;
 +import java.util.zip.ZipInputStream;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.NamespaceExistsException;
 +import org.apache.accumulo.core.client.NamespaceNotFoundException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.TableOfflineException;
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
 +import org.apache.accumulo.core.client.admin.DiskUsage;
 +import org.apache.accumulo.core.client.admin.FindMax;
 +import org.apache.accumulo.core.client.admin.Locations;
 +import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 +import org.apache.accumulo.core.client.admin.SummaryRetriever;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 +import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 +import org.apache.accumulo.core.client.summary.Summary;
 +import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation;
 +import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client;
 +import org.apache.accumulo.core.clientImpl.thrift.TDiskUsage;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.ConfigurationCopy;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.constraints.Constraint;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TabletId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.dataImpl.TabletIdImpl;
 +import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
 +import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
 +import org.apache.accumulo.core.dataImpl.thrift.TSummarizerConfiguration;
 +import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
 +import org.apache.accumulo.core.iterators.IteratorUtil;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.master.thrift.FateOperation;
 +import org.apache.accumulo.core.master.thrift.MasterClientService;
 +import org.apache.accumulo.core.metadata.MetadataServicer;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 +import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.summary.SummarizerConfigurationUtil;
 +import org.apache.accumulo.core.summary.SummaryCollection;
 +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.trace.Tracer;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.HostAndPort;
 +import org.apache.accumulo.core.util.LocalityGroupUtil;
++import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 +import org.apache.accumulo.core.util.MapCounter;
 +import org.apache.accumulo.core.util.NamingThreadFactory;
 +import org.apache.accumulo.core.util.OpTimer;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.TextUtil;
 +import org.apache.accumulo.core.volume.VolumeConfiguration;
 +import org.apache.accumulo.fate.util.Retry;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.thrift.TApplicationException;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.transport.TTransportException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.base.Preconditions;
 +
 +public class TableOperationsImpl extends TableOperationsHelper {
 +
 +  public static final String CLONE_EXCLUDE_PREFIX = "!";
 +  private static final Logger log = LoggerFactory.getLogger(TableOperations.class);
 +  private final ClientContext context;
 +
 +  public TableOperationsImpl(ClientContext context) {
 +    checkArgument(context != null, "context is null");
 +    this.context = context;
 +  }
 +
 +  @Override
 +  public SortedSet<String> list() {
 +
 +    OpTimer timer = null;
 +
 +    if (log.isTraceEnabled()) {
 +      log.trace("tid={} Fetching list of tables...", Thread.currentThread().getId());
 +      timer = new OpTimer().start();
 +    }
 +
 +    TreeSet<String> tableNames = new TreeSet<>(Tables.getNameToIdMap(context).keySet());
 +
 +    if (timer != null) {
 +      timer.stop();
 +      log.trace("tid={} Fetched {} table names in {}", Thread.currentThread().getId(),
 +          tableNames.size(), String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
 +    }
 +
 +    return tableNames;
 +  }
 +
 +  @Override
 +  public boolean exists(String tableName) {
 +    checkArgument(tableName != null, "tableName is null");
 +    if (tableName.equals(MetadataTable.NAME) || tableName.equals(RootTable.NAME))
 +      return true;
 +
 +    OpTimer timer = null;
 +
 +    if (log.isTraceEnabled()) {
 +      log.trace("tid={} Checking if table {} exists...", Thread.currentThread().getId(), tableName);
 +      timer = new OpTimer().start();
 +    }
 +
 +    boolean exists = Tables.getNameToIdMap(context).containsKey(tableName);
 +
 +    if (timer != null) {
 +      timer.stop();
 +      log.trace("tid={} Checked existance of {} in {}", Thread.currentThread().getId(), exists,
 +          String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
 +    }
 +
 +    return exists;
 +  }
 +
 +  @Override
 +  public void create(String tableName)
 +      throws AccumuloException, AccumuloSecurityException, TableExistsException {
 +    create(tableName, new NewTableConfiguration());
 +  }
 +
 +  @Override
 +  public void create(String tableName, NewTableConfiguration ntc)
 +      throws AccumuloException, AccumuloSecurityException, TableExistsException {
 +    checkArgument(tableName != null, "tableName is null");
 +    checkArgument(ntc != null, "ntc is null");
 +
 +    List<ByteBuffer> args = new ArrayList<>();
 +    args.add(ByteBuffer.wrap(tableName.getBytes(UTF_8)));
 +    args.add(ByteBuffer.wrap(ntc.getTimeType().name().getBytes(UTF_8)));
 +    // Send info relating to initial table creation i.e, create online or offline
 +    args.add(ByteBuffer.wrap(ntc.getInitialTableState().name().getBytes(UTF_8)));
 +    // Check for possible initial splits to be added at table creation
 +    // Always send number of initial splits to be created, even if zero. If greater than zero,
 +    // add the splits to the argument List which will be used by the FATE operations.
 +    int numSplits = ntc.getSplits().size();
 +    args.add(ByteBuffer.wrap(String.valueOf(numSplits).getBytes(UTF_8)));
 +    if (numSplits > 0) {
 +      for (Text t : ntc.getSplits()) {
 +        args.add(TextUtil.getByteBuffer(t));
 +      }
 +    }
 +
 +    Map<String,String> opts = ntc.getProperties();
 +
 +    try {
 +      doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_CREATE, args,
 +          opts);
 +    } catch (TableNotFoundException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +  }
 +
 +  private long beginFateOperation() throws ThriftSecurityException, TException {
 +    while (true) {
 +      MasterClientService.Iface client = null;
 +      try {
 +        client = MasterClient.getConnectionWithRetry(context);
 +        return client.beginFateOperation(Tracer.traceInfo(), context.rpcCreds());
 +      } catch (TTransportException tte) {
 +        log.debug("Failed to call beginFateOperation(), retrying ... ", tte);
 +        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +      } catch (ThriftNotActiveServiceException e) {
 +        // Let it loop, fetching a new location
 +        log.debug("Contacted a Master which is no longer active, retrying");
 +        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +      } finally {
 +        MasterClient.close(client);
 +      }
 +    }
 +  }
 +
 +  // This method is for retrying in the case of network failures; anything else it passes to the
 +  // caller to deal with
 +  private void executeFateOperation(long opid, FateOperation op, List<ByteBuffer> args,
 +      Map<String,String> opts, boolean autoCleanUp)
 +      throws ThriftSecurityException, TException, ThriftTableOperationException {
 +    while (true) {
 +      MasterClientService.Iface client = null;
 +      try {
 +        client = MasterClient.getConnectionWithRetry(context);
 +        client.executeFateOperation(Tracer.traceInfo(), context.rpcCreds(), opid, op, args, opts,
 +            autoCleanUp);
 +        return;
 +      } catch (TTransportException tte) {
 +        log.debug("Failed to call executeFateOperation(), retrying ... ", tte);
 +        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +      } catch (ThriftNotActiveServiceException e) {
 +        // Let it loop, fetching a new location
 +        log.debug("Contacted a Master which is no longer active, retrying");
 +        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +      } finally {
 +        MasterClient.close(client);
 +      }
 +    }
 +  }
 +
 +  private String waitForFateOperation(long opid)
 +      throws ThriftSecurityException, TException, ThriftTableOperationException {
 +    while (true) {
 +      MasterClientService.Iface client = null;
 +      try {
 +        client = MasterClient.getConnectionWithRetry(context);
 +        return client.waitForFateOperation(Tracer.traceInfo(), context.rpcCreds(), opid);
 +      } catch (TTransportException tte) {
 +        log.debug("Failed to call waitForFateOperation(), retrying ... ", tte);
 +        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +      } catch (ThriftNotActiveServiceException e) {
 +        // Let it loop, fetching a new location
 +        log.debug("Contacted a Master which is no longer active, retrying");
 +        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +      } finally {
 +        MasterClient.close(client);
 +      }
 +    }
 +  }
 +
 +  private void finishFateOperation(long opid) throws ThriftSecurityException, TException {
 +    while (true) {
 +      MasterClientService.Iface client = null;
 +      try {
 +        client = MasterClient.getConnectionWithRetry(context);
 +        client.finishFateOperation(Tracer.traceInfo(), context.rpcCreds(), opid);
 +        break;
 +      } catch (TTransportException tte) {
 +        log.debug("Failed to call finishFateOperation(), retrying ... ", tte);
 +        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +      } catch (ThriftNotActiveServiceException e) {
 +        // Let it loop, fetching a new location
 +        log.debug("Contacted a Master which is no longer active, retrying");
 +        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +      } finally {
 +        MasterClient.close(client);
 +      }
 +    }
 +  }
 +
 +  String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts,
 +      String tableOrNamespaceName)
 +      throws AccumuloSecurityException, TableExistsException, TableNotFoundException,
 +      AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
 +    return doFateOperation(op, args, opts, tableOrNamespaceName, true);
 +  }
 +
 +  String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts,
 +      String tableOrNamespaceName, boolean wait)
 +      throws AccumuloSecurityException, TableExistsException, TableNotFoundException,
 +      AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
 +    Long opid = null;
 +
 +    try {
 +      opid = beginFateOperation();
 +      executeFateOperation(opid, op, args, opts, !wait);
 +      if (!wait) {
 +        opid = null;
 +        return null;
 +      }
 +      return waitForFateOperation(opid);
 +    } catch (ThriftSecurityException e) {
 +      switch (e.getCode()) {
 +        case TABLE_DOESNT_EXIST:
 +          throw new TableNotFoundException(null, tableOrNamespaceName,
 +              "Target table does not exist");
 +        case NAMESPACE_DOESNT_EXIST:
 +          throw new NamespaceNotFoundException(null, tableOrNamespaceName,
 +              "Target namespace does not exist");
 +        default:
 +          String tableInfo = Tables.getPrintableTableInfoFromName(context, tableOrNamespaceName);
 +          throw new AccumuloSecurityException(e.user, e.code, tableInfo, e);
 +      }
 +    } catch (ThriftTableOperationException e) {
 +      switch (e.getType()) {
 +        case EXISTS:
 +          throw new TableExistsException(e);
 +        case NOTFOUND:
 +          throw new TableNotFoundException(e);
 +        case NAMESPACE_EXISTS:
 +          throw new NamespaceExistsException(e);
 +        case NAMESPACE_NOTFOUND:
 +          throw new NamespaceNotFoundException(e);
 +        case OFFLINE:
 +          throw new TableOfflineException(
 +              Tables.getTableOfflineMsg(context, Tables.getTableId(context, tableOrNamespaceName)));
 +        default:
 +          throw new AccumuloException(e.description, e);
 +      }
 +    } catch (Exception e) {
 +      throw new AccumuloException(e.getMessage(), e);
 +    } finally {
 +      Tables.clearCache(context);
 +      // always finish table op, even when exception
 +      if (opid != null)
 +        try {
 +          finishFateOperation(opid);
 +        } catch (Exception e) {
 +          log.warn("Exception thrown while finishing fate table operation", e);
 +        }
 +    }
 +  }
 +
 +  private static class SplitEnv {
 +    private String tableName;
 +    private Table.ID tableId;
 +    private ExecutorService executor;
 +    private CountDownLatch latch;
 +    private AtomicReference<Throwable> exception;
 +
 +    SplitEnv(String tableName, Table.ID tableId, ExecutorService executor, CountDownLatch latch,
 +        AtomicReference<Throwable> exception) {
 +      this.tableName = tableName;
 +      this.tableId = tableId;
 +      this.executor = executor;
 +      this.latch = latch;
 +      this.exception = exception;
 +    }
 +  }
 +
 +  private class SplitTask implements Runnable {
 +
 +    private List<Text> splits;
 +    private SplitEnv env;
 +
 +    SplitTask(SplitEnv env, List<Text> splits) {
 +      this.env = env;
 +      this.splits = splits;
 +    }
 +
 +    @Override
 +    public void run() {
 +      try {
 +        if (env.exception.get() != null)
 +          return;
 +
 +        if (splits.size() <= 2) {
 +          addSplits(env.tableName, new TreeSet<>(splits), env.tableId);
 +          for (int i = 0; i < splits.size(); i++)
 +            env.latch.countDown();
 +          return;
 +        }
 +
 +        int mid = splits.size() / 2;
 +
 +        // split the middle split point to ensure that child task split different tablets and can
 +        // therefore
 +        // run in parallel
 +        addSplits(env.tableName, new TreeSet<>(splits.subList(mid, mid + 1)), env.tableId);
 +        env.latch.countDown();
 +
 +        env.executor.execute(new SplitTask(env, splits.subList(0, mid)));
 +        env.executor.execute(new SplitTask(env, splits.subList(mid + 1, splits.size())));
 +
 +      } catch (Throwable t) {
 +        env.exception.compareAndSet(null, t);
 +      }
 +    }
 +
 +  }
 +
 +  @Override
 +  public void addSplits(String tableName, SortedSet<Text> partitionKeys)
 +      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
 +    Table.ID tableId = Tables.getTableId(context, tableName);
 +
 +    List<Text> splits = new ArrayList<>(partitionKeys);
 +    // should be sorted because we copied from a sorted set, but that makes assumptions about
 +    // how the copy was done so resort to be sure.
 +    Collections.sort(splits);
 +
 +    CountDownLatch latch = new CountDownLatch(splits.size());
 +    AtomicReference<Throwable> exception = new AtomicReference<>(null);
 +
 +    ExecutorService executor = Executors.newFixedThreadPool(16,
 +        new NamingThreadFactory("addSplits"));
 +    try {
 +      executor.execute(
 +          new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));
 +
 +      while (!latch.await(100, TimeUnit.MILLISECONDS)) {
 +        if (exception.get() != null) {
 +          executor.shutdownNow();
 +          Throwable excep = exception.get();
 +          // Below all exceptions are wrapped and rethrown. This is done so that the user knows what
 +          // code path got them here. If the wrapping was not done, the
 +          // user would only have the stack trace for the background thread.
 +          if (excep instanceof TableNotFoundException) {
 +            TableNotFoundException tnfe = (TableNotFoundException) excep;
 +            throw new TableNotFoundException(tableId.canonicalID(), tableName,
 +                "Table not found by background thread", tnfe);
 +          } else if (excep instanceof TableOfflineException) {
 +            log.debug("TableOfflineException occurred in background thread. Throwing new exception",
 +                excep);
 +            throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId));
 +          } else if (excep instanceof AccumuloSecurityException) {
 +            // base == background accumulo security exception
 +            AccumuloSecurityException base = (AccumuloSecurityException) excep;
 +            throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(),
 +                base.getTableInfo(), excep);
 +          } else if (excep instanceof AccumuloServerException) {
 +            throw new AccumuloServerException((AccumuloServerException) excep);
 +          } else if (excep instanceof Error) {
 +            throw new Error(excep);
 +          } else {
 +            throw new AccumuloException(excep);
 +          }
 +        }
 +      }
 +    } catch (InterruptedException e) {
 +      throw new RuntimeException(e);
 +    } finally {
 +      executor.shutdown();
 +    }
 +  }
 +
 +  private void addSplits(String tableName, SortedSet<Text> partitionKeys, Table.ID tableId)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
 +      AccumuloServerException {
 +    TabletLocator tabLocator = TabletLocator.getLocator(context, tableId);
 +
 +    for (Text split : partitionKeys) {
 +      boolean successful = false;
 +      int attempt = 0;
 +      long locationFailures = 0;
 +
 +      while (!successful) {
 +
 +        if (attempt > 0)
 +          sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +
 +        attempt++;
 +
 +        TabletLocation tl = tabLocator.locateTablet(context, split, false, false);
 +
 +        if (tl == null) {
 +          if (!Tables.exists(context, tableId))
 +            throw new TableNotFoundException(tableId.canonicalID(), tableName, null);
 +          else if (Tables.getTableState(context, tableId) == TableState.OFFLINE)
 +            throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId));
 +          continue;
 +        }
 +
 +        HostAndPort address = HostAndPort.fromString(tl.tablet_location);
 +
 +        try {
 +          TabletClientService.Client client = ThriftUtil.getTServerClient(address, context);
 +          try {
 +
 +            OpTimer timer = null;
 +
 +            if (log.isTraceEnabled()) {
 +              log.trace("tid={} Splitting tablet {} on {} at {}", Thread.currentThread().getId(),
 +                  tl.tablet_extent, address, split);
 +              timer = new OpTimer().start();
 +            }
 +
 +            client.splitTablet(Tracer.traceInfo(), context.rpcCreds(), tl.tablet_extent.toThrift(),
 +                TextUtil.getByteBuffer(split));
 +
 +            // just split it, might as well invalidate it in the cache
 +            tabLocator.invalidateCache(tl.tablet_extent);
 +
 +            if (timer != null) {
 +              timer.stop();
 +              log.trace("Split tablet in {}",
 +                  String.format("%.3f secs", timer.scale(TimeUnit.SECONDS)));
 +            }
 +
 +          } finally {
 +            ThriftUtil.returnClient(client);
 +          }
 +
 +        } catch (TApplicationException tae) {
 +          throw new AccumuloServerException(address.toString(), tae);
 +        } catch (TTransportException e) {
 +          tabLocator.invalidateCache(context, tl.tablet_location);
 +          continue;
 +        } catch (ThriftSecurityException e) {
 +          Tables.clearCache(context);
 +          if (!Tables.exists(context, tableId))
 +            throw new TableNotFoundException(tableId.canonicalID(), tableName, null);
 +          throw new AccumuloSecurityException(e.user, e.code, e);
 +        } catch (NotServingTabletException e) {
 +          // Do not silently spin when we repeatedly fail to get the location for a tablet
 +          locationFailures++;
 +          if (locationFailures == 5 || locationFailures % 50 == 0) {
 +            log.warn("Having difficulty locating hosting tabletserver for split {} on table {}."
 +                + " Seen {} failures.", split, tableName, locationFailures);
 +          }
 +
 +          tabLocator.invalidateCache(tl.tablet_extent);
 +          continue;
 +        } catch (TException e) {
 +          tabLocator.invalidateCache(context, tl.tablet_location);
 +          continue;
 +        }
 +
 +        successful = true;
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void merge(String tableName, Text start, Text end)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +
 +    checkArgument(tableName != null, "tableName is null");
 +    ByteBuffer EMPTY = ByteBuffer.allocate(0);
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)),
 +        start == null ? EMPTY : TextUtil.getByteBuffer(start),
 +        end == null ? EMPTY : TextUtil.getByteBuffer(end));
 +    Map<String,String> opts = new HashMap<>();
 +    try {
 +      doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_MERGE, args,
 +          opts);
 +    } catch (TableExistsException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +  }
 +
 +  @Override
 +  public void deleteRows(String tableName, Text start, Text end)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +
 +    checkArgument(tableName != null, "tableName is null");
 +    ByteBuffer EMPTY = ByteBuffer.allocate(0);
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)),
 +        start == null ? EMPTY : TextUtil.getByteBuffer(start),
 +        end == null ? EMPTY : TextUtil.getByteBuffer(end));
 +    Map<String,String> opts = new HashMap<>();
 +    try {
 +      doTableFateOperation(tableName, TableNotFoundException.class,
 +          FateOperation.TABLE_DELETE_RANGE, args, opts);
 +    } catch (TableExistsException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +  }
 +
 +  @Override
 +  public Collection<Text> listSplits(String tableName)
 +      throws TableNotFoundException, AccumuloSecurityException {
 +
 +    checkArgument(tableName != null, "tableName is null");
 +
 +    Table.ID tableId = Tables.getTableId(context, tableName);
 +
 +    TreeMap<KeyExtent,String> tabletLocations = new TreeMap<>();
 +
 +    while (true) {
 +      try {
 +        tabletLocations.clear();
 +        // the following method throws AccumuloException for some conditions that should be retried
 +        MetadataServicer.forTableId(context, tableId).getTabletLocations(tabletLocations);
 +        break;
 +      } catch (AccumuloSecurityException ase) {
 +        throw ase;
 +      } catch (Exception e) {
 +        if (!Tables.exists(context, tableId)) {
 +          throw new TableNotFoundException(tableId.canonicalID(), tableName, null);
 +        }
 +
 +        if (e instanceof RuntimeException && e.getCause() instanceof AccumuloSecurityException) {
 +          throw (AccumuloSecurityException) e.getCause();
 +        }
 +
 +        log.info("{} ... retrying ...", e.getMessage());
 +        sleepUninterruptibly(3, TimeUnit.SECONDS);
 +      }
 +    }
 +
 +    ArrayList<Text> endRows = new ArrayList<>(tabletLocations.size());
 +
 +    for (KeyExtent ke : tabletLocations.keySet())
 +      if (ke.getEndRow() != null)
 +        endRows.add(ke.getEndRow());
 +
 +    return endRows;
 +  }
 +
 +  @Override
 +  public Collection<Text> listSplits(String tableName, int maxSplits)
 +      throws TableNotFoundException, AccumuloSecurityException {
 +    Collection<Text> endRows = listSplits(tableName);
 +
 +    if (endRows.size() <= maxSplits)
 +      return endRows;
 +
 +    double r = (maxSplits + 1) / (double) (endRows.size());
 +    double pos = 0;
 +
 +    ArrayList<Text> subset = new ArrayList<>(maxSplits);
 +
 +    int j = 0;
 +    for (int i = 0; i < endRows.size() && j < maxSplits; i++) {
 +      pos += r;
 +      while (pos > 1) {
 +        subset.add(((ArrayList<Text>) endRows).get(i));
 +        j++;
 +        pos -= 1;
 +      }
 +    }
 +
 +    return subset;
 +  }
 +
 +  @Override
 +  public void delete(String tableName)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    checkArgument(tableName != null, "tableName is null");
 +
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)));
 +    Map<String,String> opts = new HashMap<>();
 +
 +    try {
 +      doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_DELETE,
 +          args, opts);
 +    } catch (TableExistsException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +
 +  }
 +
 +  @Override
 +  public void clone(String srcTableName, String newTableName, boolean flush,
 +      Map<String,String> propertiesToSet, Set<String> propertiesToExclude)
 +      throws AccumuloSecurityException, TableNotFoundException, AccumuloException,
 +      TableExistsException {
 +
 +    checkArgument(srcTableName != null, "srcTableName is null");
 +    checkArgument(newTableName != null, "newTableName is null");
 +
 +    Table.ID srcTableId = Tables.getTableId(context, srcTableName);
 +
 +    if (flush)
 +      _flush(srcTableId, null, null, true);
 +
 +    if (propertiesToExclude == null)
 +      propertiesToExclude = Collections.emptySet();
 +
 +    if (propertiesToSet == null)
 +      propertiesToSet = Collections.emptyMap();
 +
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(srcTableId.getUtf8()),
 +        ByteBuffer.wrap(newTableName.getBytes(UTF_8)));
 +    Map<String,String> opts = new HashMap<>();
 +    for (Entry<String,String> entry : propertiesToSet.entrySet()) {
 +      if (entry.getKey().startsWith(CLONE_EXCLUDE_PREFIX))
 +        throw new IllegalArgumentException("Property can not start with " + CLONE_EXCLUDE_PREFIX);
 +      opts.put(entry.getKey(), entry.getValue());
 +    }
 +
 +    for (String prop : propertiesToExclude) {
 +      opts.put(CLONE_EXCLUDE_PREFIX + prop, "");
 +    }
 +
 +    doTableFateOperation(newTableName, AccumuloException.class, FateOperation.TABLE_CLONE, args,
 +        opts);
 +  }
 +
 +  @Override
 +  public void rename(String oldTableName, String newTableName) throws AccumuloSecurityException,
 +      TableNotFoundException, AccumuloException, TableExistsException {
 +
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldTableName.getBytes(UTF_8)),
 +        ByteBuffer.wrap(newTableName.getBytes(UTF_8)));
 +    Map<String,String> opts = new HashMap<>();
 +    doTableFateOperation(oldTableName, TableNotFoundException.class, FateOperation.TABLE_RENAME,
 +        args, opts);
 +  }
 +
 +  @Override
 +  public void flush(String tableName) throws AccumuloException, AccumuloSecurityException {
 +    try {
 +      flush(tableName, null, null, false);
 +    } catch (TableNotFoundException e) {
 +      throw new AccumuloException(e.getMessage(), e);
 +    }
 +  }
 +
 +  @Override
 +  public void flush(String tableName, Text start, Text end, boolean wait)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    checkArgument(tableName != null, "tableName is null");
 +
 +    Table.ID tableId = Tables.getTableId(context, tableName);
 +    _flush(tableId, start, end, wait);
 +  }
 +
 +  @Override
 +  public void compact(String tableName, Text start, Text end, boolean flush, boolean wait)
 +      throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
 +    compact(tableName, start, end, new ArrayList<>(), flush, wait);
 +  }
 +
 +  @Override
 +  public void compact(String tableName, Text start, Text end, List<IteratorSetting> iterators,
 +      boolean flush, boolean wait)
 +      throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
 +    compact(tableName, new CompactionConfig().setStartRow(start).setEndRow(end)
 +        .setIterators(iterators).setFlush(flush).setWait(wait));
 +  }
 +
 +  @Override
 +  public void compact(String tableName, CompactionConfig config)
 +      throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
 +    checkArgument(tableName != null, "tableName is null");
 +    ByteBuffer EMPTY = ByteBuffer.allocate(0);
 +
 +    // Ensure compaction iterators exist on a tabletserver
 +    final String skviName = SortedKeyValueIterator.class.getName();
 +    for (IteratorSetting setting : config.getIterators()) {
 +      String iteratorClass = setting.getIteratorClass();
 +      if (!testClassLoad(tableName, iteratorClass, skviName)) {
 +        throw new AccumuloException("TabletServer could not load iterator class " + iteratorClass);
 +      }
 +    }
 +
 +    // Make sure the specified compaction strategy exists on a tabletserver
 +    final String compactionStrategyName = config.getCompactionStrategy().getClassName();
 +    if (!CompactionStrategyConfigUtil.DEFAULT_STRATEGY.getClassName()
 +        .equals(compactionStrategyName)) {
 +      if (!testClassLoad(tableName, compactionStrategyName,
 +          "org.apache.accumulo.tserver.compaction.CompactionStrategy")) {
 +        throw new AccumuloException(
 +            "TabletServer could not load CompactionStrategy class " + compactionStrategyName);
 +      }
 +    }
 +
 +    Table.ID tableId = Tables.getTableId(context, tableName);
 +
 +    Text start = config.getStartRow();
 +    Text end = config.getEndRow();
 +
 +    if (config.getFlush())
 +      _flush(tableId, start, end, true);
 +
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8()),
 +        start == null ? EMPTY : TextUtil.getByteBuffer(start),
 +        end == null ? EMPTY : TextUtil.getByteBuffer(end),
 +        ByteBuffer.wrap(IteratorUtil.encodeIteratorSettings(config.getIterators())),
 +        ByteBuffer.wrap(CompactionStrategyConfigUtil.encode(config.getCompactionStrategy())));
 +
 +    Map<String,String> opts = new HashMap<>();
 +    try {
 +      doFateOperation(FateOperation.TABLE_COMPACT, args, opts, tableName, config.getWait());
 +    } catch (TableExistsException | NamespaceExistsException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    } catch (NamespaceNotFoundException e) {
 +      throw new TableNotFoundException(null, tableName, "Namespace not found", e);
 +    }
 +  }
 +
 +  @Override
 +  public void cancelCompaction(String tableName)
 +      throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
 +    Table.ID tableId = Tables.getTableId(context, tableName);
 +
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8()));
 +
 +    Map<String,String> opts = new HashMap<>();
 +    try {
 +      doTableFateOperation(tableName, TableNotFoundException.class,
 +          FateOperation.TABLE_CANCEL_COMPACT, args, opts);
 +    } catch (TableExistsException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +
 +  }
 +
 +  private void _flush(Table.ID tableId, Text start, Text end, boolean wait)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +
 +    try {
 +      long flushID;
 +
 +      // used to pass the table name. but the tableid associated with a table name could change
 +      // between calls.
 +      // so pass the tableid to both calls
 +
 +      while (true) {
 +        MasterClientService.Iface client = null;
 +        try {
 +          client = MasterClient.getConnectionWithRetry(context);
 +          flushID = client.initiateFlush(Tracer.traceInfo(), context.rpcCreds(),
 +              tableId.canonicalID());
 +          break;
 +        } catch (TTransportException tte) {
 +          log.debug("Failed to call initiateFlush, retrying ... ", tte);
 +          sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +        } catch (ThriftNotActiveServiceException e) {
 +          // Let it loop, fetching a new location
 +          log.debug("Contacted a Master which is no longer active, retrying");
 +          sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +        } finally {
 +          MasterClient.close(client);
 +        }
 +      }
 +
 +      while (true) {
 +        MasterClientService.Iface client = null;
 +        try {
 +          client = MasterClient.getConnectionWithRetry(context);
 +          client.waitForFlush(Tracer.traceInfo(), context.rpcCreds(), tableId.canonicalID(),
 +              TextUtil.getByteBuffer(start), TextUtil.getByteBuffer(end), flushID,
 +              wait ? Long.MAX_VALUE : 1);
 +          break;
 +        } catch (TTransportException tte) {
 +          log.debug("Failed to call initiateFlush, retrying ... ", tte);
 +          sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +        } catch (ThriftNotActiveServiceException e) {
 +          // Let it loop, fetching a new location
 +          log.debug("Contacted a Master which is no longer active, retrying");
 +          sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +        } finally {
 +          MasterClient.close(client);
 +        }
 +      }
 +    } catch (ThriftSecurityException e) {
 +      switch (e.getCode()) {
 +        case TABLE_DOESNT_EXIST:
 +          throw new TableNotFoundException(tableId.canonicalID(), null, e.getMessage(), e);
 +        default:
 +          log.debug("flush security exception on table id {}", tableId);
 +          throw new AccumuloSecurityException(e.user, e.code, e);
 +      }
 +    } catch (ThriftTableOperationException e) {
 +      switch (e.getType()) {
 +        case NOTFOUND:
 +          throw new TableNotFoundException(e);
 +        default:
 +          throw new AccumuloException(e.description, e);
 +      }
 +    } catch (Exception e) {
 +      throw new AccumuloException(e);
 +    }
 +  }
 +
 +  @Override
 +  public void setProperty(final String tableName, final String property, final String value)
 +      throws AccumuloException, AccumuloSecurityException {
 +    checkArgument(tableName != null, "tableName is null");
 +    checkArgument(property != null, "property is null");
 +    checkArgument(value != null, "value is null");
 +    try {
-       MasterClient.executeTable(context, client -> client.setTableProperty(Tracer.traceInfo(),
-           context.rpcCreds(), tableName, property, value));
++      setPropertyNoChecks(tableName, property, value);
++
++      checkLocalityGroups(tableName, property);
 +    } catch (TableNotFoundException e) {
 +      throw new AccumuloException(e);
 +    }
 +  }
 +
++  private void setPropertyNoChecks(final String tableName, final String property,
++      final String value)
++      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
++    MasterClient.executeTable(context, client -> client.setTableProperty(Tracer.traceInfo(),
++        context.rpcCreds(), tableName, property, value));
++  }
++
 +  @Override
 +  public void removeProperty(final String tableName, final String property)
 +      throws AccumuloException, AccumuloSecurityException {
 +    checkArgument(tableName != null, "tableName is null");
 +    checkArgument(property != null, "property is null");
 +    try {
-       MasterClient.executeTable(context, client -> client.removeTableProperty(Tracer.traceInfo(),
-           context.rpcCreds(), tableName, property));
++      removePropertyNoChecks(tableName, property);
++
++      checkLocalityGroups(tableName, property);
 +    } catch (TableNotFoundException e) {
 +      throw new AccumuloException(e);
 +    }
 +  }
 +
++  private void removePropertyNoChecks(final String tableName, final String property)
++      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
++    MasterClient.executeTable(context, client -> client.removeTableProperty(Tracer.traceInfo(),
++        context.rpcCreds(), tableName, property));
++  }
++
++  void checkLocalityGroups(String tableName, String propChanged)
++      throws AccumuloException, TableNotFoundException {
++    if (LocalityGroupUtil.isLocalityGroupProperty(propChanged)) {
++      Iterable<Entry<String,String>> allProps = getProperties(tableName);
++      try {
++        LocalityGroupUtil.checkLocalityGroups(allProps);
++      } catch (LocalityGroupConfigurationError | RuntimeException e) {
++        LoggerFactory.getLogger(this.getClass()).warn("Changing '" + propChanged + "' for table '"
++            + tableName
++            + "' resulted in bad locality group config.  This may be a transient situation since "
++            + "the config spreads over multiple properties.  Setting properties in a different "
++            + "order may help.  Even though this warning was displayed, the property was updated. "
++            + "Please check your config to ensure consistency.", e);
++      }
++    }
++  }
++
 +  @Override
 +  public Iterable<Entry<String,String>> getProperties(final String tableName)
 +      throws AccumuloException, TableNotFoundException {
 +    checkArgument(tableName != null, "tableName is null");
 +    try {
 +      return ServerClient.executeRaw(context,
 +          client -> client.getTableConfiguration(Tracer.traceInfo(), context.rpcCreds(), tableName))
 +          .entrySet();
 +    } catch (ThriftTableOperationException e) {
 +      switch (e.getType()) {
 +        case NOTFOUND:
 +          throw new TableNotFoundException(e);
 +        case NAMESPACE_NOTFOUND:
 +          throw new TableNotFoundException(tableName, new NamespaceNotFoundException(e));
 +        default:
 +          throw new AccumuloException(e.description, e);
 +      }
 +    } catch (AccumuloException e) {
 +      throw e;
 +    } catch (Exception e) {
 +      throw new AccumuloException(e);
 +    }
 +
 +  }
 +
 +  @Override
 +  public void setLocalityGroups(String tableName, Map<String,Set<Text>> groups)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    // ensure locality groups do not overlap
 +    LocalityGroupUtil.ensureNonOverlappingGroups(groups);
 +
 +    for (Entry<String,Set<Text>> entry : groups.entrySet()) {
 +      Set<Text> colFams = entry.getValue();
 +      String value = LocalityGroupUtil.encodeColumnFamilies(colFams);
-       setProperty(tableName, Property.TABLE_LOCALITY_GROUP_PREFIX + entry.getKey(), value);
++      setPropertyNoChecks(tableName, Property.TABLE_LOCALITY_GROUP_PREFIX + entry.getKey(), value);
 +    }
 +
 +    try {
-       setProperty(tableName, Property.TABLE_LOCALITY_GROUPS.getKey(),
++      setPropertyNoChecks(tableName, Property.TABLE_LOCALITY_GROUPS.getKey(),
 +          Joiner.on(",").join(groups.keySet()));
 +    } catch (AccumuloException e) {
 +      if (e.getCause() instanceof TableNotFoundException)
 +        throw (TableNotFoundException) e.getCause();
 +      throw e;
 +    }
 +
 +    // remove anything extraneous
 +    String prefix = Property.TABLE_LOCALITY_GROUP_PREFIX.getKey();
 +    for (Entry<String,String> entry : getProperties(tableName)) {
 +      String property = entry.getKey();
 +      if (property.startsWith(prefix)) {
 +        // this property configures a locality group, find out which
 +        // one:
 +        String[] parts = property.split("\\.");
 +        String group = parts[parts.length - 1];
 +
 +        if (!groups.containsKey(group)) {
-           removeProperty(tableName, property);
++          removePropertyNoChecks(tableName, property);
 +        }
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public Map<String,Set<Text>> getLocalityGroups(String tableName)
 +      throws AccumuloException, TableNotFoundException {
 +    AccumuloConfiguration conf = new ConfigurationCopy(this.getProperties(tableName));
 +    Map<String,Set<ByteSequence>> groups = LocalityGroupUtil.getLocalityGroups(conf);
 +
 +    Map<String,Set<Text>> groups2 = new HashMap<>();
 +    for (Entry<String,Set<ByteSequence>> entry : groups.entrySet()) {
 +
 +      HashSet<Text> colFams = new HashSet<>();
 +
 +      for (ByteSequence bs : entry.getValue()) {
 +        colFams.add(new Text(bs.toArray()));
 +      }
 +
 +      groups2.put(entry.getKey(), colFams);
 +    }
 +
 +    return groups2;
 +  }
 +
 +  @Override
 +  public Set<Range> splitRangeByTablets(String tableName, Range range, int maxSplits)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    checkArgument(tableName != null, "tableName is null");
 +    checkArgument(range != null, "range is null");
 +    if (maxSplits < 1)
 +      throw new IllegalArgumentException("maximum splits must be >= 1");
 +    if (maxSplits == 1)
 +      return Collections.singleton(range);
 +
 +    Random random = new SecureRandom();
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
 +    Table.ID tableId = Tables.getTableId(context, tableName);
 +    TabletLocator tl = TabletLocator.getLocator(context, tableId);
 +    // its possible that the cache could contain complete, but old information about a tables
 +    // tablets... so clear it
 +    tl.invalidateCache();
 +    while (!tl.binRanges(context, Collections.singletonList(range), binnedRanges).isEmpty()) {
 +      if (!Tables.exists(context, tableId))
 +        throw new TableDeletedException(tableId.canonicalID());
 +      if (Tables.getTableState(context, tableId) == TableState.OFFLINE)
 +        throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId));
 +
 +      log.warn("Unable to locate bins for specified range. Retrying.");
 +      // sleep randomly between 100 and 200ms
 +      sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
 +      binnedRanges.clear();
 +      tl.invalidateCache();
 +    }
 +
 +    // group key extents to get <= maxSplits
 +    LinkedList<KeyExtent> unmergedExtents = new LinkedList<>();
 +    List<KeyExtent> mergedExtents = new ArrayList<>();
 +
 +    for (Map<KeyExtent,List<Range>> map : binnedRanges.values())
 +      unmergedExtents.addAll(map.keySet());
 +
 +    // the sort method is efficient for linked list
 +    Collections.sort(unmergedExtents);
 +
 +    while (unmergedExtents.size() + mergedExtents.size() > maxSplits) {
 +      if (unmergedExtents.size() >= 2) {
 +        KeyExtent first = unmergedExtents.removeFirst();
 +        KeyExtent second = unmergedExtents.removeFirst();
 +        first.setEndRow(second.getEndRow());
 +        mergedExtents.add(first);
 +      } else {
 +        mergedExtents.addAll(unmergedExtents);
 +        unmergedExtents.clear();
 +        unmergedExtents.addAll(mergedExtents);
 +        mergedExtents.clear();
 +      }
 +
 +    }
 +
 +    mergedExtents.addAll(unmergedExtents);
 +
 +    Set<Range> ranges = new HashSet<>();
 +    for (KeyExtent k : mergedExtents)
 +      ranges.add(k.toDataRange().clip(range));
 +
 +    return ranges;
 +  }
 +
 +  private Path checkPath(String dir, String kind, String type)
 +      throws IOException, AccumuloException, AccumuloSecurityException {
 +    Path ret;
 +    Map<String,String> props = context.instanceOperations().getSystemConfiguration();
 +    AccumuloConfiguration conf = new ConfigurationCopy(props);
 +
 +    FileSystem fs = VolumeConfiguration.getVolume(dir, CachedConfiguration.getInstance(), conf)
 +        .getFileSystem();
 +
 +    if (dir.contains(":")) {
 +      ret = new Path(dir);
 +    } else {
 +      ret = fs.makeQualified(new Path(dir));
 +    }
 +
 +    try {
 +      if (!fs.getFileStatus(ret).isDirectory()) {
 +        throw new AccumuloException(
 +            kind + " import " + type + " directory " + dir + " is not a directory!");
 +      }
 +    } catch (FileNotFoundException fnf) {
 +      throw new AccumuloException(
 +          kind + " import " + type + " directory " + dir + " does not exist!");
 +    }
 +
 +    if (type.equals("failure")) {
 +      FileStatus[] listStatus = fs.listStatus(ret);
 +      if (listStatus != null && listStatus.length != 0) {
 +        throw new AccumuloException("Bulk import failure directory " + ret + " is not empty");
 +      }
 +    }
 +
 +    return ret;
 +  }
 +
 +  @Override
 +  @Deprecated
 +  public void importDirectory(String tableName, String dir, String failureDir, boolean setTime)
 +      throws IOException, AccumuloSecurityException, TableNotFoundException, AccumuloException {
 +    checkArgument(tableName != null, "tableName is null");
 +    checkArgument(dir != null, "dir is null");
 +    checkArgument(failureDir != null, "failureDir is null");
 +    // check for table existence
 +    Tables.getTableId(context, tableName);
 +
 +    Path dirPath = checkPath(dir, "Bulk", "");
 +    Path failPath = checkPath(failureDir, "Bulk", "failure");
 +
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)),
 +        ByteBuffer.wrap(dirPath.toString().getBytes(UTF_8)),
 +        ByteBuffer.wrap(failPath.toString().getBytes(UTF_8)),
 +        ByteBuffer.wrap((setTime + "").getBytes(UTF_8)));
 +    Map<String,String> opts = new HashMap<>();
 +
 +    try {
 +      doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_BULK_IMPORT,
 +          args, opts);
 +    } catch (TableExistsException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +  }
 +
 +  private void waitForTableStateTransition(Table.ID tableId, TableState expectedState)
 +      throws AccumuloException, TableNotFoundException {
 +
 +    Text startRow = null;
 +    Text lastRow = null;
 +
 +    while (true) {
 +
 +      if (Tables.getTableState(context, tableId) != expectedState) {
 +        Tables.clearCache(context);
 +        TableState currentState = Tables.getTableState(context, tableId);
 +        if (currentState != expectedState) {
 +          if (!Tables.exists(context, tableId))
 +            throw new TableDeletedException(tableId.canonicalID());
 +          if (currentState == TableState.DELETING)
 +            throw new TableNotFoundException(tableId.canonicalID(), "", "Table is being deleted.");
 +          throw new AccumuloException("Unexpected table state " + tableId + " "
 +              + Tables.getTableState(context, tableId) + " != " + expectedState);
 +        }
 +      }
 +
 +      Range range;
 +      if (startRow == null || lastRow == null)
 +        range = new KeyExtent(tableId, null, null).toMetadataRange();
 +      else
 +        range = new Range(startRow, lastRow);
 +
 +      TabletsMetadata tablets = TabletsMetadata.builder().scanMetadataTable().overRange(range)
 +          .fetchLocation().fetchPrev().build(context);
 +
 +      KeyExtent lastExtent = null;
 +
 +      int total = 0;
 +      int waitFor = 0;
 +      int holes = 0;
 +      Text continueRow = null;
 +      MapCounter<String> serverCounts = new MapCounter<>();
 +
 +      for (TabletMetadata tablet : tablets) {
 +        total++;
 +
 +        Location loc = tablet.getLocation();
 +
 +        if ((expectedState == TableState.ONLINE
 +            && (loc == null || loc.getType() == LocationType.FUTURE))
 +            || (expectedState == TableState.OFFLINE && loc != null)) {
 +          if (continueRow == null)
 +            continueRow = tablet.getExtent().getMetadataEntry();
 +          waitFor++;
 +          lastRow = tablet.getExtent().getMetadataEntry();
 +
 +          if (loc != null) {
 +            serverCounts.increment(loc.getId(), 1);
 +          }
 +        }
 +
 +        if (!tablet.getExtent().getTableId().equals(tableId)) {
 +          throw new AccumuloException(
 +              "Saw unexpected table Id " + tableId + " " + tablet.getExtent());
 +        }
 +
 +        if (lastExtent != null && !tablet.getExtent().isPreviousExtent(lastExtent)) {
 +          holes++;
 +        }
 +
 +        lastExtent = tablet.getExtent();
 +      }
 +
 +      if (continueRow != null) {
 +        startRow = continueRow;
 +      }
 +
 +      if (holes > 0 || total == 0) {
 +        startRow = null;
 +        lastRow = null;
 +      }
 +
 +      if (waitFor > 0 || holes > 0 || total == 0) {
 +        long waitTime;
 +        long maxPerServer = 0;
 +        if (serverCounts.size() > 0) {
 +          maxPerServer = Collections.max(serverCounts.values());
 +          waitTime = maxPerServer * 10;
 +        } else
 +          waitTime = waitFor * 10;
 +        waitTime = Math.max(100, waitTime);
 +        waitTime = Math.min(5000, waitTime);
 +        log.trace("Waiting for {}({}) tablets, startRow = {} lastRow = {}, holes={} sleeping:{}ms",
 +            waitFor, maxPerServer, startRow, lastRow, holes, waitTime);
 +        sleepUninterruptibly(waitTime, TimeUnit.MILLISECONDS);
 +      } else {
 +        break;
 +      }
 +
 +    }
 +  }
 +
 +  @Override
 +  public void offline(String tableName)
 +      throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
 +    offline(tableName, false);
 +  }
 +
 +  @Override
 +  public void offline(String tableName, boolean wait)
 +      throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
 +
 +    checkArgument(tableName != null, "tableName is null");
 +    Table.ID tableId = Tables.getTableId(context, tableName);
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8()));
 +    Map<String,String> opts = new HashMap<>();
 +
 +    try {
 +      doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_OFFLINE,
 +          args, opts);
 +    } catch (TableExistsException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +
 +    if (wait)
 +      waitForTableStateTransition(tableId, TableState.OFFLINE);
 +  }
 +
 +  @Override
 +  public void online(String tableName)
 +      throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
 +    online(tableName, false);
 +  }
 +
 +  @Override
 +  public void online(String tableName, boolean wait)
 +      throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
 +    checkArgument(tableName != null, "tableName is null");
 +
 +    Table.ID tableId = Tables.getTableId(context, tableName);
 +
 +    /**
 +     * ACCUMULO-4574 if table is already online return without executing fate operation.
 +     */
 +
 +    TableState expectedState = Tables.getTableState(context, tableId, true);
 +    if (expectedState == TableState.ONLINE) {
 +      if (wait)
 +        waitForTableStateTransition(tableId, TableState.ONLINE);
 +      return;
 +    }
 +
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8()));
 +    Map<String,String> opts = new HashMap<>();
 +
 +    try {
 +      doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_ONLINE,
 +          args, opts);
 +    } catch (TableExistsException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +
 +    if (wait)
 +      waitForTableStateTransition(tableId, TableState.ONLINE);
 +  }
 +
 +  @Override
 +  public void clearLocatorCache(String tableName) throws TableNotFoundException {
 +    checkArgument(tableName != null, "tableName is null");
 +    TabletLocator tabLocator = TabletLocator.getLocator(context,
 +        Tables.getTableId(context, tableName));
 +    tabLocator.invalidateCache();
 +  }
 +
 +  @Override
 +  public Map<String,String> tableIdMap() {
 +    return Tables.getNameToIdMap(context).entrySet().stream()
 +        .collect(Collectors.toMap(Entry::getKey, e -> e.getValue().canonicalID(), (v1, v2) -> {
 +          throw new RuntimeException(String.format("Duplicate key for values %s and %s", v1, v2));
 +        }, TreeMap::new));
 +  }
 +
 +  @Override
 +  public Text getMaxRow(String tableName, Authorizations auths, Text startRow,
 +      boolean startInclusive, Text endRow, boolean endInclusive) throws TableNotFoundException {
 +    checkArgument(tableName != null, "tableName is null");
 +    checkArgument(auths != null, "auths is null");
 +    Scanner scanner = context.createScanner(tableName, auths);
 +    return FindMax.findMax(scanner, startRow, startInclusive, endRow, endInclusive);
 +  }
 +
 +  @Override
 +  public List<DiskUsage> getDiskUsage(Set<String> tableNames)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +
 +    List<TDiskUsage> diskUsages = null;
 +    while (diskUsages == null) {
 +      Pair<String,Client> pair = null;
 +      try {
 +        // this operation may us a lot of memory... its likely that connections to tabletservers
 +        // hosting metadata tablets will be cached, so do not use cached
 +        // connections
 +        pair = ServerClient.getConnection(context, false);
 +        diskUsages = pair.getSecond().getDiskUsage(tableNames, context.rpcCreds());
 +      } catch (ThriftTableOperationException e) {
 +        switch (e.getType()) {
 +          case NOTFOUND:
 +            throw new TableNotFoundException(e);
 +          case NAMESPACE_NOTFOUND:
 +            throw new TableNotFoundException(e.getTableName(), new NamespaceNotFoundException(e));
 +          default:
 +            throw new AccumuloException(e.description, e);
 +        }
 +      } catch (ThriftSecurityException e) {
 +        throw new AccumuloSecurityException(e.getUser(), e.getCode());
 +      } catch (TTransportException e) {
 +        // some sort of communication error occurred, retry
 +        if (pair == null) {
 +          log.debug("Disk usage request failed.  Pair is null.  Retrying request...", e);
 +        } else {
 +          log.debug("Disk usage request failed {}, retrying ... ", pair.getFirst(), e);
 +        }
 +        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +      } catch (TException e) {
 +        // may be a TApplicationException which indicates error on the server side
 +        throw new AccumuloException(e);
 +      } finally {
 +        // must always return thrift connection
 +        if (pair != null)
 +          ServerClient.close(pair.getSecond());
 +      }
 +    }
 +
 +    List<DiskUsage> finalUsages = new ArrayList<>();
 +    for (TDiskUsage diskUsage : diskUsages) {
 +      finalUsages.add(new DiskUsage(new TreeSet<>(diskUsage.getTables()), diskUsage.getUsage()));
 +    }
 +
 +    return finalUsages;
 +  }
 +
 +  public static Map<String,String> getExportedProps(FileSystem fs, Path path) throws IOException {
 +    HashMap<String,String> props = new HashMap<>();
 +
 +    try (ZipInputStream zis = new ZipInputStream(fs.open(path))) {
 +      ZipEntry zipEntry;
 +      while ((zipEntry = zis.getNextEntry()) != null) {
 +        if (zipEntry.getName().equals(Constants.EXPORT_TABLE_CONFIG_FILE)) {
 +          try (BufferedReader in = new BufferedReader(new InputStreamReader(zis, UTF_8))) {
 +            String line;
 +            while ((line = in.readLine()) != null) {
 +              String sa[] = line.split("=", 2);
 +              props.put(sa[0], sa[1]);
 +            }
 +          }
 +
 +          break;
 +        }
 +      }
 +    }
 +    return props;
 +  }
 +
 +  @Override
 +  public void importTable(String tableName, String importDir)
 +      throws TableExistsException, AccumuloException, AccumuloSecurityException {
 +    checkArgument(tableName != null, "tableName is null");
 +    checkArgument(importDir != null, "importDir is null");
 +
 +    try {
 +      importDir = checkPath(importDir, "Table", "").toString();
 +    } catch (IOException e) {
 +      throw new AccumuloException(e);
 +    }
 +
 +    try {
 +      FileSystem fs = new Path(importDir).getFileSystem(CachedConfiguration.getInstance());
 +      Map<String,String> props = getExportedProps(fs, new Path(importDir, Constants.EXPORT_FILE));
 +
 +      for (Entry<String,String> entry : props.entrySet()) {
 +        if (Property.isClassProperty(entry.getKey())
 +            && !entry.getValue().contains(Constants.CORE_PACKAGE_NAME)) {
 +          LoggerFactory.getLogger(this.getClass()).info(
 +              "Imported table sets '{}' to '{}'.  Ensure this class is on Accumulo classpath.",
 +              sanitize(entry.getKey()), sanitize(entry.getValue()));
 +        }
 +      }
 +
 +    } catch (IOException ioe) {
 +      LoggerFactory.getLogger(this.getClass()).warn(
 +          "Failed to check if imported table references external java classes : {}",
 +          ioe.getMessage());
 +    }
 +
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)),
 +        ByteBuffer.wrap(importDir.getBytes(UTF_8)));
 +
 +    Map<String,String> opts = Collections.emptyMap();
 +
 +    try {
 +      doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_IMPORT, args,
 +          opts);
 +    } catch (TableNotFoundException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +
 +  }
 +
 +  /**
 +   * Prevent potential CRLF injection into logs from read in user data See
 +   * https://find-sec-bugs.github.io/bugs.htm#CRLF_INJECTION_LOGS
 +   */
 +  private String sanitize(String msg) {
 +    return msg.replaceAll("[\r\n]", "");
 +  }
 +
 +  @Override
 +  public void exportTable(String tableName, String exportDir)
 +      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
 +    checkArgument(tableName != null, "tableName is null");
 +    checkArgument(exportDir != null, "exportDir is null");
 +
 +    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)),
 +        ByteBuffer.wrap(exportDir.getBytes(UTF_8)));
 +
 +    Map<String,String> opts = Collections.emptyMap();
 +
 +    try {
 +      doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_EXPORT,
 +          args, opts);
 +    } catch (TableExistsException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    }
 +  }
 +
 +  @Override
 +  public boolean testClassLoad(final String tableName, final String className,
 +      final String asTypeName)
 +      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
 +    checkArgument(tableName != null, "tableName is null");
 +    checkArgument(className != null, "className is null");
 +    checkArgument(asTypeName != null, "asTypeName is null");
 +
 +    try {
 +      return ServerClient.executeRaw(context, client -> client.checkTableClass(Tracer.traceInfo(),
 +          context.rpcCreds(), tableName, className, asTypeName));
 +    } catch (ThriftTableOperationException e) {
 +      switch (e.getType()) {
 +        case NOTFOUND:
 +          throw new TableNotFoundException(e);
 +        case NAMESPACE_NOTFOUND:
 +          throw new TableNotFoundException(tableName, new NamespaceNotFoundException(e));
 +        default:
 +          throw new AccumuloException(e.description, e);
 +      }
 +    } catch (ThriftSecurityException e) {
 +      throw new AccumuloSecurityException(e.user, e.code, e);
 +    } catch (AccumuloException e) {
 +      throw e;
 +    } catch (Exception e) {
 +      throw new AccumuloException(e);
 +    }
 +  }
 +
 +  @Override
 +  public void attachIterator(String tableName, IteratorSetting setting,
 +      EnumSet<IteratorScope> scopes)
 +      throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
 +    testClassLoad(tableName, setting.getIteratorClass(), SortedKeyValueIterator.class.getName());
 +    super.attachIterator(tableName, setting, scopes);
 +  }
 +
 +  @Override
 +  public int addConstraint(String tableName, String constraintClassName)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    testClassLoad(tableName, constraintClassName, Constraint.class.getName());
 +    return super.addConstraint(tableName, constraintClassName);
 +  }
 +
 +  private void doTableFateOperation(String tableOrNamespaceName,
 +      Class<? extends Exception> namespaceNotFoundExceptionClass, FateOperation op,
 +      List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException,
 +      AccumuloException, TableExistsException, TableNotFoundException {
 +    try {
 +      doFateOperation(op, args, opts, tableOrNamespaceName);
 +    } catch (NamespaceExistsException e) {
 +      // should not happen
 +      throw new AssertionError(e);
 +    } catch (NamespaceNotFoundException e) {
 +      if (namespaceNotFoundExceptionClass == null) {
 +        // should not happen
 +        throw new AssertionError(e);
 +      } else if (AccumuloException.class.isAssignableFrom(namespaceNotFoundExceptionClass)) {
 +        throw new AccumuloException("Cannot create table in non-existent namespace", e);
 +      } else if (TableNotFoundException.class.isAssignableFrom(namespaceNotFoundExceptionClass)) {
 +        throw new TableNotFoundException(null, tableOrNamespaceName, "Namespace not found", e);
 +      } else {
 +        // should not happen
 +        throw new AssertionError(e);
 +      }
 +    }
 +  }
 +
 +  private void clearSamplerOptions(String tableName)
 +      throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
 +    String prefix = Property.TABLE_SAMPLER_OPTS.getKey();
 +    for (Entry<String,String> entry : getProperties(tableName)) {
 +      String property = entry.getKey();
 +      if (property.startsWith(prefix)) {
 +        removeProperty(tableName, property);
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void setSamplerConfiguration(String tableName, SamplerConfiguration samplerConfiguration)
 +      throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
 +    clearSamplerOptions(tableName);
 +
 +    List<Pair<String,String>> props = new SamplerConfigurationImpl(samplerConfiguration)
 +        .toTableProperties();
 +    for (Pair<String,String> pair : props) {
 +      setProperty(tableName, pair.getFirst(), pair.getSecond());
 +    }
 +  }
 +
 +  @Override
 +  public void clearSamplerConfiguration(String tableName)
 +      throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
 +    removeProperty(tableName, Property.TABLE_SAMPLER.getKey());
 +    clearSamplerOptions(tableName);
 +  }
 +
 +  @Override
 +  public SamplerConfiguration getSamplerConfiguration(String tableName)
 +      throws TableNotFoundException, AccumuloException {
 +    AccumuloConfiguration conf = new ConfigurationCopy(this.getProperties(tableName));
 +    SamplerConfigurationImpl sci = SamplerConfigurationImpl.newSamplerConfig(conf);
 +    if (sci == null) {
 +      return null;
 +    }
 +    return sci.toSamplerConfiguration();
 +  }
 +
 +  private static class LoctionsImpl implements Locations {
 +
 +    private Map<Range,List<TabletId>> groupedByRanges;
 +    private Map<TabletId,List<Range>> groupedByTablets;
 +    private Map<TabletId,String> tabletLocations;
 +
 +    public LoctionsImpl(Map<String,Map<KeyExtent,List<Range>>> binnedRanges) {
 +      groupedByTablets = new HashMap<>();
 +      groupedByRanges = null;
 +      tabletLocations = new HashMap<>();
 +
 +      for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
 +        String location = entry.getKey();
 +
 +        for (Entry<KeyExtent,List<Range>> entry2 : entry.getValue().entrySet()) {
 +          TabletIdImpl tabletId = new TabletIdImpl(entry2.getKey());
 +          tabletLocations.put(tabletId, location);
 +          List<Range> prev = groupedByTablets.put(tabletId,
 +              Collections.unmodifiableList(entry2.getValue()));
 +          if (prev != null) {
 +            throw new RuntimeException(
 +                "Unexpected : tablet at multiple locations : " + location + " " + tabletId);
 +          }
 +        }
 +      }
 +
 +      groupedByTablets = Collections.unmodifiableMap(groupedByTablets);
 +    }
 +
 +    @Override
 +    public String getTabletLocation(TabletId tabletId) {
 +      return tabletLocations.get(tabletId);
 +    }
 +
 +    @Override
 +    public Map<Range,List<TabletId>> groupByRange() {
 +      if (groupedByRanges == null) {
 +        Map<Range,List<TabletId>> tmp = new HashMap<>();
 +
 +        for (Entry<TabletId,List<Range>> entry : groupedByTablets.entrySet()) {
 +          for (Range range : entry.getValue()) {
 +            List<TabletId> tablets = tmp.get(range);
 +            if (tablets == null) {
 +              tablets = new ArrayList<>();
 +              tmp.put(range, tablets);
 +            }
 +
 +            tablets.add(entry.getKey());
 +          }
 +        }
 +
 +        Map<Range,List<TabletId>> tmp2 = new HashMap<>();
 +        for (Entry<Range,List<TabletId>> entry : tmp.entrySet()) {
 +          tmp2.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
 +        }
 +
 +        groupedByRanges = Collections.unmodifiableMap(tmp2);
 +      }
 +
 +      return groupedByRanges;
 +    }
 +
 +    @Override
 +    public Map<TabletId,List<Range>> groupByTablet() {
 +      return groupedByTablets;
 +    }
 +  }
 +
 +  @Override
 +  public Locations locate(String tableName, Collection<Range> ranges)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    requireNonNull(tableName, "tableName must be non null");
 +    requireNonNull(ranges, "ranges must be non null");
 +
 +    Table.ID tableId = Tables.getTableId(context, tableName);
 +    TabletLocator locator = TabletLocator.getLocator(context, tableId);
 +
 +    List<Range> rangeList = null;
 +    if (ranges instanceof List) {
 +      rangeList = (List<Range>) ranges;
 +    } else {
 +      rangeList = new ArrayList<>(ranges);
 +    }
 +
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
 +
 +    locator.invalidateCache();
 +
 +    Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
 +        .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).logInterval(3, TimeUnit.MINUTES)
 +        .createRetry();
 +
 +    while (!locator.binRanges(context, rangeList, binnedRanges).isEmpty()) {
 +
 +      if (!Tables.exists(context, tableId))
 +        throw new TableNotFoundException(tableId.canonicalID(), tableName, null);
 +      if (Tables.getTableState(context, tableId) == TableState.OFFLINE)
 +        throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId));
 +
 +      binnedRanges.clear();
 +
 +      try {
 +        retry.waitForNextAttempt();
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +
 +      locator.invalidateCache();
 +    }
 +
 +    return new LoctionsImpl(binnedRanges);
 +  }
 +
 +  @Override
 +  public SummaryRetriever summaries(String tableName) {
 +
 +    return new SummaryRetriever() {
 +
 +      private Text startRow = null;
 +      private Text endRow = null;
 +      private List<TSummarizerConfiguration> summariesToFetch = Collections.emptyList();
 +      private String summarizerClassRegex;
 +      private boolean flush = false;
 +
 +      @Override
 +      public SummaryRetriever startRow(Text startRow) {
 +        Objects.requireNonNull(startRow);
 +        if (endRow != null) {
 +          Preconditions.checkArgument(startRow.compareTo(endRow) < 0,
 +              "Start row must be less than end row : %s >= %s", startRow, endRow);
 +        }
 +        this.startRow = startRow;
 +        return this;
 +      }
 +
 +      @Override
 +      public SummaryRetriever startRow(CharSequence startRow) {
 +        return startRow(new Text(startRow.toString()));
 +      }
 +
 +      @Override
 +      public List<Summary> retrieve()
 +          throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +        Table.ID tableId = Tables.getTableId(context, tableName);
 +        if (Tables.getTableState(context, tableId) == TableState.OFFLINE)
 +          throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId));
 +
 +        TRowRange range = new TRowRange(TextUtil.getByteBuffer(startRow),
 +            TextUtil.getByteBuffer(endRow));
 +        TSummaryRequest request = new TSummaryRequest(tableId.canonicalID(), range,
 +            summariesToFetch, summarizerClassRegex);
 +        if (flush) {
 +          _flush(tableId, startRow, endRow, true);
 +        }
 +
 +        TSummaries ret = ServerClient.execute(context, new TabletClientService.Client.Factory(),
 +            client -> {
 +              TSummaries tsr = client.startGetSummaries(Tracer.traceInfo(), context.rpcCreds(),
 +                  request);
 +              while (!tsr.finished) {
 +                tsr = client.contiuneGetSummaries(Tracer.traceInfo(), tsr.sessionId);
 +              }
 +              return tsr;
 +            });
 +        return new SummaryCollection(ret).getSummaries();
 +      }
 +
 +      @Override
 +      public SummaryRetriever endRow(Text endRow) {
 +        Objects.requireNonNull(endRow);
 +        if (startRow != null) {
 +          Preconditions.checkArgument(startRow.compareTo(endRow) < 0,
 +              "Start row must be less than end row : %s >= %s", startRow, endRow);
 +        }
 +        this.endRow = endRow;
 +        return this;
 +      }
 +
 +      @Override
 +      public SummaryRetriever endRow(CharSequence endRow) {
 +        return endRow(new Text(endRow.toString()));
 +      }
 +
 +      @Override
 +      public SummaryRetriever withConfiguration(Collection<SummarizerConfiguration> configs) {
 +        Objects.requireNonNull(configs);
 +        summariesToFetch = configs.stream().map(SummarizerConfigurationUtil::toThrift)
 +            .collect(Collectors.toList());
 +        return this;
 +      }
 +
 +      @Override
 +      public SummaryRetriever withConfiguration(SummarizerConfiguration... config) {
 +        Objects.requireNonNull(config);
 +        return withConfiguration(Arrays.asList(config));
 +      }
 +
 +      @Override
 +      public SummaryRetriever withMatchingConfiguration(String regex) {
 +        Objects.requireNonNull(regex);
 +        // Do a sanity check here to make sure that regex compiles, instead of having it fail on a
 +        // tserver.
 +        Pattern.compile(regex);
 +        this.summarizerClassRegex = regex;
 +        return this;
 +      }
 +
 +      @Override
 +      public SummaryRetriever flush(boolean b) {
 +        this.flush = b;
 +        return this;
 +      }
 +    };
 +  }
 +
 +  @Override
 +  public void addSummarizers(String tableName, SummarizerConfiguration... newConfigs)
 +      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    HashSet<SummarizerConfiguration> currentConfigs = new HashSet<>(
 +        SummarizerConfiguration.fromTableProperties(getProperties(tableName)));
 +    HashSet<SummarizerConfiguration> newConfigSet = new HashSet<>(Arrays.asList(newConfigs));
 +
 +    newConfigSet.removeIf(currentConfigs::contains);
 +
 +    Set<String> newIds = newConfigSet.stream().map(SummarizerConfiguration::getPropertyId)
 +        .collect(toSet());
 +
 +    for (SummarizerConfiguration csc : currentConfigs) {
 +      if (newIds.contains(csc.getPropertyId())) {
 +        throw new IllegalArgumentException("Summarizer property id is in use by " + csc);
 +      }
 +    }
 +
 +    Set<Entry<String,String>> es = SummarizerConfiguration.toTableProperties(newConfigSet)
 +        .entrySet();
 +    for (Entry<String,String> entry : es) {
 +      setProperty(tableName, entry.getKey(), entry.getValue());
 +    }
 +  }
 +
 +  @Override
 +  public void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate)
 +      throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
 +    Collection<SummarizerConfiguration> summarizerConfigs = SummarizerConfiguration
 +        .fromTableProperties(getProperties(tableName));
 +    for (SummarizerConfiguration sc : summarizerConfigs) {
 +      if (predicate.test(sc)) {
 +        Set<String> ks = sc.toTableProperties().keySet();
 +        for (String key : ks) {
 +          removeProperty(tableName, key);
 +        }
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public List<SummarizerConfiguration> listSummarizers(String tableName)
 +      throws AccumuloException, TableNotFoundException {
 +    return new ArrayList<>(SummarizerConfiguration.fromTableProperties(getProperties(tableName)));
 +  }
 +
 +  @Override
 +  public ImportDestinationArguments importDirectory(String directory) {
 +    return new BulkImport(directory, context);
 +  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
index 0e3fc4b,040f8d0..98568a0
--- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
@@@ -30,7 -30,8 +30,9 @@@ import java.util.SortedSet
  import java.util.TreeSet;
  
  import org.apache.accumulo.core.client.AccumuloException;
++import org.apache.accumulo.core.clientImpl.Table;
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.conf.ConfigurationCopy;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.ArrayByteSequence;
  import org.apache.accumulo.core.data.ByteSequence;
@@@ -79,6 -84,31 +83,31 @@@ public class LocalityGroupUtil 
      }
    }
  
+   public static boolean isLocalityGroupProperty(String prop) {
+     return prop.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey())
+         || prop.equals(Property.TABLE_LOCALITY_GROUPS.getKey());
+   }
+ 
+   public static void checkLocalityGroups(Iterable<Entry<String,String>> config)
+       throws LocalityGroupConfigurationError {
+     ConfigurationCopy cc = new ConfigurationCopy(config);
+     if (cc.get(Property.TABLE_LOCALITY_GROUPS) != null) {
+       getLocalityGroups(cc);
+     }
+   }
+ 
+   public static Map<String,Set<ByteSequence>> getLocalityGroupsIgnoringErrors(
 -      AccumuloConfiguration acuconf, String tableId) {
++      AccumuloConfiguration acuconf, Table.ID tableId) {
+     try {
+       return getLocalityGroups(acuconf);
+     } catch (LocalityGroupConfigurationError | RuntimeException e) {
+       log.warn("Failed to get locality group config for tableId:" + tableId
+           + ", proceeding without locality groups.", e);
+     }
+ 
+     return Collections.emptyMap();
+   }
+ 
    public static Map<String,Set<ByteSequence>> getLocalityGroups(AccumuloConfiguration acuconf)
        throws LocalityGroupConfigurationError {
      Map<String,Set<ByteSequence>> result = new HashMap<>();
@@@ -351,15 -391,4 +389,20 @@@
  
      reader.seek(range, families, inclusive);
    }
 +
 +  public static void ensureNonOverlappingGroups(Map<String,Set<Text>> groups) {
 +    HashSet<Text> all = new HashSet<>();
 +    for (Entry<String,Set<Text>> entry : groups.entrySet()) {
 +      if (!Collections.disjoint(all, entry.getValue())) {
 +        throw new IllegalArgumentException(
 +            "Group " + entry.getKey() + " overlaps with another group");
 +      }
++
++      if (entry.getValue().isEmpty()) {
++        throw new IllegalArgumentException("Group " + entry.getKey() + " is empty");
++      }
++
 +      all.addAll(entry.getValue());
 +    }
 +  }
  }
diff --cc server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
index 0609c70,3d76cca..e3b0498
--- a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
@@@ -27,10 -27,22 +27,9 @@@ import java.util.Map.Entry
  import java.util.Set;
  import java.util.TreeSet;
  
- import org.apache.accumulo.core.Constants;
 -import org.apache.accumulo.core.client.AccumuloException;
  import org.apache.accumulo.core.client.AccumuloSecurityException;
 -import org.apache.accumulo.core.client.Instance;
  import org.apache.accumulo.core.client.NamespaceNotFoundException;
  import org.apache.accumulo.core.client.TableNotFoundException;
 -import org.apache.accumulo.core.client.impl.Credentials;
 -import org.apache.accumulo.core.client.impl.Namespaces;
 -import org.apache.accumulo.core.client.impl.Tables;
 -import org.apache.accumulo.core.client.impl.thrift.ClientService;
 -import org.apache.accumulo.core.client.impl.thrift.ConfigurationType;
 -import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 -import org.apache.accumulo.core.client.impl.thrift.TDiskUsage;
 -import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 -import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 -import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
  import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
  import org.apache.accumulo.core.client.security.tokens.KerberosToken;
  import org.apache.accumulo.core.client.security.tokens.PasswordToken;
@@@ -338,15 -339,14 +337,14 @@@ public class ClientServiceHandler imple
          throw new AccumuloSecurityException(credentials.getPrincipal(),
              SecurityErrorCode.PERMISSION_DENIED);
        bulkImportStatus.updateBulkImportStatus(files, BulkImportState.INITIAL);
 -      log.debug("Got request to bulk import files to table(" + tableId + "): " + files);
 +      log.debug("Got request to bulk import files to table({}): {}", tableId, files);
-       return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
-         bulkImportStatus.updateBulkImportStatus(files, BulkImportState.PROCESSING);
-         try {
-           return BulkImporter.bulkLoad(context, tid, tableId, files, setTime);
-         } finally {
-           bulkImportStatus.removeBulkImportStatus(files);
-         }
-       });
+ 
+       bulkImportStatus.updateBulkImportStatus(files, BulkImportState.PROCESSING);
+       try {
 -        return BulkImporter.bulkLoad(context, tid, tableId, files, errorDir, setTime);
++        return BulkImporter.bulkLoad(context, tid, tableId, files, setTime);
+       } finally {
+         bulkImportStatus.removeBulkImportStatus(files);
+       }
      } catch (AccumuloSecurityException e) {
        throw e.asThriftException();
      } catch (Exception ex) {
diff --cc server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
index b1eba60,12f3133..9c42d41
--- a/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
@@@ -159,12 -160,25 +159,25 @@@ public class WalStateManager 
    public List<Path> getWalsInUse(TServerInstance tsi) throws WalMarkerException {
      List<Path> result = new ArrayList<>();
      try {
 -      String zpath = root() + "/" + tsi.toString();
 +      String zpath = root() + "/" + tsi;
        zoo.sync(zpath);
        for (String child : zoo.getChildren(zpath)) {
-         Pair<WalState,Path> parts = parse(zoo.getData(zpath + "/" + child, null));
-         if (parts.getFirst() != WalState.UNREFERENCED) {
-           result.add(parts.getSecond());
+         byte[] zdata = null;
+         try {
+           // This function is called by the Master. Its possible that Accumulo GC deletes an
+           // unreferenced WAL in ZK after the call to getChildren above. Catch this exception inside
+           // the loop so that not all children are ignored.
+           zdata = zoo.getData(zpath + "/" + child, null);
+         } catch (KeeperException.NoNodeException e) {
+           log.debug("WAL state removed {} {} during getWalsInUse.  Likely a race condition between "
+               + "master and GC.", tsi, child);
+         }
+ 
+         if (zdata != null) {
+           Pair<WalState,Path> parts = parse(zdata);
+           if (parts.getFirst() != WalState.UNREFERENCED) {
+             result.add(parts.getSecond());
+           }
          }
        }
      } catch (KeeperException.NoNodeException e) {
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
index 308cb3b,0000000..1a3b4bd
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
@@@ -1,276 -1,0 +1,279 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps.bulkVer1;
 +
 +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
 +import org.apache.accumulo.core.clientImpl.Table;
 +import org.apache.accumulo.core.clientImpl.Tables;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.master.thrift.BulkImportState;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.master.tableOps.MasterRepo;
 +import org.apache.accumulo.master.tableOps.Utils;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.tablets.UniqueNameAllocator;
 +import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.MapFile;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import com.google.common.annotations.VisibleForTesting;
++
 +/**
 + * Bulk import makes requests of tablet servers, and those requests can take a long time. Our
 + * communications to the tablet server may fail, so we won't know the status of the request. The
 + * master will repeat failed requests so now there are multiple requests to the tablet server. The
 + * tablet server will not execute the request multiple times, so long as the marker it wrote in the
 + * metadata table stays there. The master needs to know when all requests have finished so it can
 + * remove the markers. Did it start? Did it finish? We can see that *a* request completed by seeing
 + * the flag written into the metadata table, but we won't know if some other rogue thread is still
 + * waiting to start a thread and repeat the operation.
 + *
 + * The master can ask the tablet server if it has any requests still running. Except the tablet
 + * server might have some thread about to start a request, but before it has made any bookkeeping
 + * about the request. To prevent problems like this, an Arbitrator is used. Before starting any new
 + * request, the tablet server checks the Arbitrator to see if the request is still valid.
 + */
 +public class BulkImport extends MasterRepo {
 +  public static final String FAILURES_TXT = "failures.txt";
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private static final Logger log = LoggerFactory.getLogger(BulkImport.class);
 +
 +  private Table.ID tableId;
 +  private String sourceDir;
 +  private String errorDir;
 +  private boolean setTime;
 +
 +  public BulkImport(Table.ID tableId, String sourceDir, String errorDir, boolean setTime) {
 +    this.tableId = tableId;
 +    this.sourceDir = sourceDir;
 +    this.errorDir = errorDir;
 +    this.setTime = setTime;
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    if (!Utils.getReadLock(master, tableId, tid).tryLock())
 +      return 100;
 +
 +    Tables.clearCache(master.getContext());
 +    if (Tables.getTableState(master.getContext(), tableId) == TableState.ONLINE) {
 +      long reserve1, reserve2;
 +      reserve1 = reserve2 = Utils.reserveHdfsDirectory(master, sourceDir, tid);
 +      if (reserve1 == 0)
 +        reserve2 = Utils.reserveHdfsDirectory(master, errorDir, tid);
 +      return reserve2;
 +    } else {
 +      throw new AcceptableThriftTableOperationException(tableId.canonicalID(), null,
 +          TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
 +    }
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    log.debug(" tid {} sourceDir {}", tid, sourceDir);
 +
 +    Utils.getReadLock(master, tableId, tid).lock();
 +
 +    // check that the error directory exists and is empty
 +    VolumeManager fs = master.getFileSystem();
 +
 +    Path errorPath = new Path(errorDir);
 +    FileStatus errorStatus = null;
 +    try {
 +      errorStatus = fs.getFileStatus(errorPath);
 +    } catch (FileNotFoundException ex) {
 +      // ignored
 +    }
 +    if (errorStatus == null)
 +      throw new AcceptableThriftTableOperationException(tableId.canonicalID(), null,
 +          TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
 +          errorDir + " does not exist");
 +    if (!errorStatus.isDirectory())
 +      throw new AcceptableThriftTableOperationException(tableId.canonicalID(), null,
 +          TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
 +          errorDir + " is not a directory");
 +    if (fs.listStatus(errorPath).length != 0)
 +      throw new AcceptableThriftTableOperationException(tableId.canonicalID(), null,
 +          TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
 +          errorDir + " is not empty");
 +
 +    ZooArbitrator.start(master.getContext(), Constants.BULK_ARBITRATOR_TYPE, tid);
 +    master.updateBulkImportStatus(sourceDir, BulkImportState.MOVING);
 +    // move the files into the directory
 +    try {
-       String bulkDir = prepareBulkImport(master, fs, sourceDir, tableId);
++      String bulkDir = prepareBulkImport(master.getContext(), fs, sourceDir, tableId);
 +      log.debug(" tid {} bulkDir {}", tid, bulkDir);
 +      return new LoadFiles(tableId, sourceDir, bulkDir, errorDir, setTime);
 +    } catch (IOException ex) {
 +      log.error("error preparing the bulk import directory", ex);
 +      throw new AcceptableThriftTableOperationException(tableId.canonicalID(), null,
 +          TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_INPUT_DIRECTORY,
 +          sourceDir + ": " + ex);
 +    }
 +  }
 +
-   private Path createNewBulkDir(ServerContext context, VolumeManager fs, Table.ID tableId)
-       throws IOException {
++  private static Path createNewBulkDir(ServerContext context, VolumeManager fs, String sourceDir,
++      Table.ID tableId) throws IOException {
 +    Path tempPath = fs.matchingFileSystem(new Path(sourceDir),
 +        ServerConstants.getTablesDirs(context.getConfiguration()));
 +    if (tempPath == null)
 +      throw new IOException(sourceDir + " is not in a volume configured for Accumulo");
 +
 +    String tableDir = tempPath.toString();
 +    if (tableDir == null)
 +      throw new IOException(sourceDir + " is not in a volume configured for Accumulo");
 +    Path directory = new Path(tableDir + "/" + tableId);
 +    fs.mkdirs(directory);
 +
 +    // only one should be able to create the lock file
 +    // the purpose of the lock file is to avoid a race
 +    // condition between the call to fs.exists() and
 +    // fs.mkdirs()... if only hadoop had a mkdir() function
 +    // that failed when the dir existed
 +
 +    UniqueNameAllocator namer = context.getUniqueNameAllocator();
 +
 +    while (true) {
 +      Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
 +      if (fs.exists(newBulkDir)) // sanity check
 +        throw new IOException("Dir exist when it should not " + newBulkDir);
 +      if (fs.mkdirs(newBulkDir))
 +        return newBulkDir;
 +      log.warn("Failed to create {} for unknown reason", newBulkDir);
 +
 +      sleepUninterruptibly(3, TimeUnit.SECONDS);
 +    }
 +  }
 +
-   private String prepareBulkImport(Master master, final VolumeManager fs, String dir,
++  @VisibleForTesting
++  public static String prepareBulkImport(ServerContext master, final VolumeManager fs, String dir,
 +      Table.ID tableId) throws Exception {
-     final Path bulkDir = createNewBulkDir(master.getContext(), fs, tableId);
++    final Path bulkDir = createNewBulkDir(master, fs, dir, tableId);
 +
-     MetadataTableUtil.addBulkLoadInProgressFlag(master.getContext(),
++    MetadataTableUtil.addBulkLoadInProgressFlag(master,
 +        "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
 +
 +    Path dirPath = new Path(dir);
 +    FileStatus[] mapFiles = fs.listStatus(dirPath);
 +
-     final UniqueNameAllocator namer = master.getContext().getUniqueNameAllocator();
++    final UniqueNameAllocator namer = master.getUniqueNameAllocator();
 +
 +    int workerCount = master.getConfiguration().getCount(Property.MASTER_BULK_RENAME_THREADS);
 +    SimpleThreadPool workers = new SimpleThreadPool(workerCount, "bulk move");
 +    List<Future<Exception>> results = new ArrayList<>();
 +
 +    for (FileStatus file : mapFiles) {
 +      final FileStatus fileStatus = file;
 +      results.add(workers.submit(() -> {
 +        try {
 +          String sa[] = fileStatus.getPath().getName().split("\\.");
 +          String extension = "";
 +          if (sa.length > 1) {
 +            extension = sa[sa.length - 1];
 +
 +            if (!FileOperations.getValidExtensions().contains(extension)) {
 +              log.warn("{} does not have a valid extension, ignoring", fileStatus.getPath());
 +              return null;
 +            }
 +          } else {
 +            // assume it is a map file
 +            extension = Constants.MAPFILE_EXTENSION;
 +          }
 +
 +          if (extension.equals(Constants.MAPFILE_EXTENSION)) {
 +            if (!fileStatus.isDirectory()) {
 +              log.warn("{} is not a map file, ignoring", fileStatus.getPath());
 +              return null;
 +            }
 +
 +            if (fileStatus.getPath().getName().equals("_logs")) {
 +              log.info("{} is probably a log directory from a map/reduce task, skipping",
 +                  fileStatus.getPath());
 +              return null;
 +            }
 +            try {
 +              FileStatus dataStatus = fs
 +                  .getFileStatus(new Path(fileStatus.getPath(), MapFile.DATA_FILE_NAME));
 +              if (dataStatus.isDirectory()) {
 +                log.warn("{} is not a map file, ignoring", fileStatus.getPath());
 +                return null;
 +              }
 +            } catch (FileNotFoundException fnfe) {
 +              log.warn("{} is not a map file, ignoring", fileStatus.getPath());
 +              return null;
 +            }
 +          }
 +
 +          String newName = "I" + namer.getNextName() + "." + extension;
 +          Path newPath = new Path(bulkDir, newName);
 +          try {
 +            fs.rename(fileStatus.getPath(), newPath);
 +            log.debug("Moved {} to {}", fileStatus.getPath(), newPath);
 +          } catch (IOException E1) {
 +            log.error("Could not move: {} {}", fileStatus.getPath(), E1.getMessage());
 +          }
 +
 +        } catch (Exception ex) {
 +          return ex;
 +        }
 +        return null;
 +      }));
 +    }
 +    workers.shutdown();
 +    while (!workers.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {}
 +
 +    for (Future<Exception> ex : results) {
 +      if (ex.get() != null) {
 +        throw ex.get();
 +      }
 +    }
 +    return bulkDir.toString();
 +  }
 +
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +    // unreserve source/error directories
 +    Utils.unreserveHdfsDirectory(environment, sourceDir, tid);
 +    Utils.unreserveHdfsDirectory(environment, errorDir, tid);
 +    Utils.getReadLock(environment, tableId, tid).unlock();
 +    ZooArbitrator.cleanup(environment.getContext(), Constants.BULK_ARBITRATOR_TYPE, tid);
 +  }
 +}
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index b9e2167,d96577a..338f007
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@@ -39,6 -39,6 +39,7 @@@ import java.util.concurrent.atomic.Atom
  
  import org.apache.accumulo.core.client.SampleNotPresentException;
  import org.apache.accumulo.core.client.sample.Sampler;
++import org.apache.accumulo.core.clientImpl.Table;
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
  import org.apache.accumulo.core.conf.ConfigurationCopy;
  import org.apache.accumulo.core.conf.Property;
@@@ -132,16 -131,14 +132,15 @@@ public class InMemoryMap 
      return pair.getSecond();
    }
  
-   public InMemoryMap(AccumuloConfiguration config, ServerContext serverContext)
-       throws LocalityGroupConfigurationError {
 -  public InMemoryMap(AccumuloConfiguration config, String tableId) {
++  public InMemoryMap(AccumuloConfiguration config, ServerContext serverContext, Table.ID tableId) {
  
      boolean useNativeMap = config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED);
  
      this.memDumpDir = config.get(Property.TSERV_MEMDUMP_DIR);
-     this.lggroups = LocalityGroupUtil.getLocalityGroups(config);
+     this.lggroups = LocalityGroupUtil.getLocalityGroupsIgnoringErrors(config, tableId);
  
      this.config = config;
 +    this.context = serverContext;
  
      SimpleMap allMap;
      SimpleMap sampleMap;
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 08c5495,dc4d349..e42cba8
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -2175,6 -2034,14 +2188,12 @@@ public class TabletServer implements Ru
              // modification
            }
  
+           List<DfsLogger> closedCopy;
+ 
+           synchronized (closedLogs) {
+             closedCopy = copyClosedLogs(closedLogs);
+           }
+ 
 -          int numMajorCompactionsInProgress = 0;
 -
            Iterator<Entry<KeyExtent,Tablet>> iter = copyOnlineTablets.entrySet().iterator();
  
            // bail early now if we're shutting down
@@@ -2191,17 -2058,33 +2210,10 @@@
                continue;
              }
  
-             int maxLogEntriesPerTablet = getTableConfiguration(tablet.getExtent())
-                 .getCount(Property.TABLE_MINC_LOGS_MAX);
- 
-             if (tablet.getLogCount() >= maxLogEntriesPerTablet) {
-               log.debug("Initiating minor compaction for {} because it has {} write ahead logs",
-                   tablet.getExtent(), tablet.getLogCount());
-               tablet.initiateMinorCompaction(MinorCompactionReason.SYSTEM);
-             }
+             tablet.checkIfMinorCompactionNeededForLogs(closedCopy);
  
              synchronized (tablet) {
 -              if (tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL)
 -                  || tablet.isMajorCompactionQueued() || tablet.isMajorCompactionRunning()) {
 -                numMajorCompactionsInProgress++;
 -                continue;
 -              }
 -            }
 -          }
 -
 -          int idleCompactionsToStart = Math.max(1,
 -              getConfiguration().getCount(Property.TSERV_MAJC_MAXCONCURRENT) / 2);
 -
 -          if (numMajorCompactionsInProgress < idleCompactionsToStart) {
 -            // system is not major compacting, can schedule some
 -            // idle compactions
 -            iter = copyOnlineTablets.entrySet().iterator();
 -
 -            while (iter.hasNext() && !majorCompactorDisabled
 -                && numMajorCompactionsInProgress < idleCompactionsToStart) {
 -              Entry<KeyExtent,Tablet> entry = iter.next();
 -              Tablet tablet = entry.getValue();
 -
 -              if (tablet.initiateMajorCompaction(MajorCompactionReason.IDLE)) {
 -                numMajorCompactionsInProgress++;
 -              }
 +              tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL);
              }
            }
          } catch (Throwable t) {
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 721b0d8,8f32aae..9381a88
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@@ -76,6 -68,8 +76,7 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import com.google.common.base.Joiner;
 -import com.google.common.base.Optional;
+ import com.google.common.base.Preconditions;
  
  /**
   * Wrap a connection to a logger.
@@@ -327,9 -321,9 +328,10 @@@ public class DfsLogger implements Compa
    private AtomicLong syncCounter;
    private AtomicLong flushCounter;
    private final long slowFlushMillis;
+   private long writes = 0;
  
 -  private DfsLogger(ServerResources conf) {
 +  private DfsLogger(ServerContext context, ServerResources conf) {
 +    this.context = context;
      this.conf = conf;
      this.slowFlushMillis = conf.getConfiguration()
          .getTimeInMillis(Property.TSERV_SLOW_FLUSH_MILLIS);
@@@ -538,7 -598,12 +540,12 @@@
        }
    }
  
+   public synchronized long getWrites() {
+     Preconditions.checkState(writes >= 0);
+     return writes;
+   }
+ 
 -  public synchronized void defineTablet(long seq, int tid, KeyExtent tablet) throws IOException {
 +  public LoggerOperation defineTablet(CommitSession cs) throws IOException {
      // write this log to the METADATA table
      final LogFileKey key = new LogFileKey();
      key.event = DEFINE_TABLET;
@@@ -552,29 -628,31 +559,27 @@@
      key.write(encryptingLogFile);
      value.write(encryptingLogFile);
      encryptingLogFile.flush();
+     writes++;
    }
  
 -  public LoggerOperation log(long seq, int tid, Mutation mutation, Durability durability)
 -      throws IOException {
 -    return logManyTablets(Collections.singletonList(
 -        new TabletMutations(tid, seq, Collections.singletonList(mutation), durability)));
 +  private LoggerOperation logKeyData(LogFileKey key, Durability d) throws IOException {
 +    return logFileData(singletonList(new Pair<>(key, EMPTY)), d);
    }
  
    private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys,
        Durability durability) throws IOException {
      DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1), durability);
 -    synchronized (DfsLogger.this) {
 -      try {
 -        for (Pair<LogFileKey,LogFileValue> pair : keys) {
 -          write(pair.getFirst(), pair.getSecond());
 -        }
 -      } catch (ClosedChannelException ex) {
 -        throw new LogClosedException();
 -      } catch (Exception e) {
 -        log.error("Failed to write log entries", e);
 -        work.exception = e;
 +    try {
 +      for (Pair<LogFileKey,LogFileValue> pair : keys) {
 +        write(pair.getFirst(), pair.getSecond());
        }
 +    } catch (ClosedChannelException ex) {
 +      throw new LogClosedException();
 +    } catch (Exception e) {
 +      log.error("Failed to write log entries", e);
 +      work.exception = e;
      }
  
-     if (durability == Durability.LOG)
-       return NO_WAIT_LOGGER_OP;
- 
      synchronized (closeLock) {
        // use a different lock for close check so that adding to work queue does not need
        // to wait on walog I/O operations
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
index ae9185e,5bffa69..3f9ded3
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
@@@ -200,16 -209,10 +209,11 @@@ public class Compactor implements Calla
      try {
        FileOperations fileFactory = FileOperations.getInstance();
        FileSystem ns = this.fs.getVolumeByPath(outputFilePath).getFileSystem();
 -      mfw = fileFactory.newWriterBuilder().forFile(outputFilePathName, ns, ns.getConf())
 +      mfw = fileFactory.newWriterBuilder()
 +          .forFile(outputFilePathName, ns, ns.getConf(), context.getCryptoService())
            .withTableConfiguration(acuTableConf).withRateLimiter(env.getWriteLimiter()).build();
  
-       Map<String,Set<ByteSequence>> lGroups;
-       try {
-         lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf);
-       } catch (LocalityGroupConfigurationError e) {
-         throw new IOException(e);
-       }
+       Map<String,Set<ByteSequence>> lGroups = getLocalityGroups(acuTableConf);
  
        long t1 = System.currentTimeMillis();
  
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index 263fd08,66a15ec..ae2ff45
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@@ -23,9 -22,13 +23,12 @@@ import java.security.SecureRandom
  import java.util.Collections;
  import java.util.Map;
  import java.util.Random;
+ import java.util.Set;
  import java.util.concurrent.TimeUnit;
  
 -import org.apache.accumulo.core.client.IteratorSetting;
 -import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.clientImpl.Tables;
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.data.ByteSequence;
  import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
  import org.apache.accumulo.core.master.state.tables.TableState;
  import org.apache.accumulo.core.metadata.schema.DataFileValue;
@@@ -97,9 -101,15 +101,15 @@@ public class MinorCompactor extends Com
    }
  
    @Override
+   protected Map<String,Set<ByteSequence>> getLocalityGroups(AccumuloConfiguration acuTableConf)
+       throws IOException {
+     return LocalityGroupUtil.getLocalityGroupsIgnoringErrors(acuTableConf, extent.getTableId());
+   }
+ 
+   @Override
    public CompactionStats call() {
      final String outputFileName = getOutputFile();
 -    log.debug("Begin minor compaction " + outputFileName + " " + getExtent());
 +    log.debug("Begin minor compaction {} {}", outputFileName, getExtent());
  
      // output to new MapFile with a temporary name
      int sleepTime = 100;
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index d128f02,98bf80a..c59a90c
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@@ -2485,6 -2497,38 +2485,38 @@@ public class Tablet implements TabletCo
      candidates.removeAll(referencedLogs);
    }
  
+   public void checkIfMinorCompactionNeededForLogs(List<DfsLogger> closedLogs) {
+ 
+     // grab this outside of tablet lock.
+     int maxLogs = tableConfiguration.getCount(Property.TABLE_MINC_LOGS_MAX);
+ 
+     String reason = null;
+     synchronized (this) {
+       if (currentLogs.size() >= maxLogs) {
+         reason = "referenced " + currentLogs.size() + " write ahead logs";
+       } else if (maxLogs < closedLogs.size()) {
+         // If many tablets reference a single WAL, but each tablet references a different WAL then
+         // this could result in the tablet server referencing many WALs. For recovery that would
+         // mean each tablet had to process lots of WAL. This check looks for a single use of an
+         // older WAL and compacts if one is found. The following check assumes the most recent WALs
+         // are at the end of the list and ignores these.
+         List<DfsLogger> oldClosed = closedLogs.subList(0, closedLogs.size() - maxLogs);
+         for (DfsLogger closedLog : oldClosed) {
+           if (currentLogs.contains(closedLog)) {
+             reason = "referenced at least one old write ahead log " + closedLog.getFileName();
+             break;
+           }
+         }
+       }
+     }
+ 
+     if (reason != null) {
+       // initiate and log outside of tablet lock
+       initiateMinorCompaction(MinorCompactionReason.SYSTEM);
 -      log.debug("Initiating minor compaction for " + getExtent() + " because " + reason);
++      log.debug("Initiating minor compaction for {} because {}", getExtent(), reason);
+     }
+   }
+ 
    Set<String> beginClearingUnusedLogs() {
      Set<String> unusedLogs = new HashSet<>();
  
@@@ -2545,14 -2589,8 +2577,10 @@@
    // this lock is basically used to synchronize writing of log info to metadata
    private final ReentrantLock logLock = new ReentrantLock();
  
-   public synchronized int getLogCount() {
-     return currentLogs.size();
-   }
- 
    // don't release the lock if this method returns true for success; instead, the caller should
    // clean up by calling finishUpdatingLogsUsed()
 +  @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK",
 +      justification = "lock is released by caller calling finishedUpdatingLogsUsed method")
    @Override
    public boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger more, boolean mincFinish) {
  
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
index c812356,3a54e42..291e187
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
@@@ -22,8 -23,6 +22,7 @@@ import java.util.List
  
  import org.apache.accumulo.core.data.Mutation;
  import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
- import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 +import org.apache.accumulo.server.ServerContext;
  import org.apache.accumulo.tserver.InMemoryMap;
  import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
  import org.slf4j.Logger;
@@@ -38,16 -37,10 +37,13 @@@ class TabletMemory implements Closeabl
    private InMemoryMap deletingMemTable;
    private long nextSeq = 1L;
    private CommitSession commitSession;
 +  private ServerContext context;
  
 -  TabletMemory(TabletCommitter tablet) {
 +  TabletMemory(Tablet tablet) {
      this.tablet = tablet;
 -    memTable = new InMemoryMap(tablet.getTableConfiguration(), tablet.getExtent().getTableId());
 +    this.context = tablet.getContext();
-     try {
-       memTable = new InMemoryMap(tablet.getTableConfiguration(), context);
-     } catch (LocalityGroupConfigurationError e) {
-       throw new RuntimeException(e);
-     }
++    memTable = new InMemoryMap(tablet.getTableConfiguration(), context,
++        tablet.getExtent().getTableId());
      commitSession = new CommitSession(tablet, nextSeq, memTable);
      nextSeq += 2;
    }
@@@ -73,11 -66,7 +69,8 @@@
      }
  
      otherMemTable = memTable;
-     try {
-       memTable = new InMemoryMap(tablet.getTableConfiguration(), context);
-     } catch (LocalityGroupConfigurationError e) {
-       throw new RuntimeException(e);
-     }
 -    memTable = new InMemoryMap(tablet.getTableConfiguration(), tablet.getExtent().getTableId());
++    memTable = new InMemoryMap(tablet.getTableConfiguration(), context,
++        tablet.getExtent().getTableId());
  
      CommitSession oldCommitSession = commitSession;
      commitSession = new CommitSession(tablet, nextSeq, memTable);
diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
index cee92ee,3a08f5f..0a593aa
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
@@@ -40,7 -40,7 +40,8 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.sample.RowSampler;
  import org.apache.accumulo.core.client.sample.Sampler;
  import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 +import org.apache.accumulo.core.clientImpl.BaseIteratorEnvironment;
++import org.apache.accumulo.core.clientImpl.Table;
  import org.apache.accumulo.core.conf.ConfigurationCopy;
  import org.apache.accumulo.core.conf.DefaultConfiguration;
  import org.apache.accumulo.core.conf.Property;
@@@ -188,7 -178,7 +189,7 @@@ public class InMemoryMapTest 
      ConfigurationCopy config = new ConfigurationCopy(DefaultConfiguration.getInstance());
      config.set(Property.TSERV_NATIVEMAP_ENABLED, "" + useNative);
      config.set(Property.TSERV_MEMDUMP_DIR, memDumpDir);
-     return new InMemoryMap(config, getServerContext());
 -    return new InMemoryMap(config, "--TEST--");
++    return new InMemoryMap(config, getServerContext(), Table.ID.of("--TEST--"));
    }
  
    @Test
@@@ -561,7 -551,7 +562,7 @@@
          LocalityGroupUtil.encodeColumnFamilies(toTextSet("cf3", "cf4")));
      config.set(Property.TABLE_LOCALITY_GROUPS.getKey(), "lg1,lg2");
  
-     InMemoryMap imm = new InMemoryMap(config, getServerContext());
 -    InMemoryMap imm = new InMemoryMap(config, "--TEST--");
++    InMemoryMap imm = new InMemoryMap(config, getServerContext(), Table.ID.of("--TEST--"));
  
      Mutation m1 = new Mutation("r1");
      m1.put("cf1", "x", 2, "1");
@@@ -623,8 -613,8 +624,7 @@@
      }
  
      for (ConfigurationCopy config : Arrays.asList(config1, config2)) {
--
-       InMemoryMap imm = new InMemoryMap(config, getServerContext());
 -      InMemoryMap imm = new InMemoryMap(config, "--TEST--");
++      InMemoryMap imm = new InMemoryMap(config, getServerContext(), Table.ID.of("--TEST--"));
  
        TreeMap<Key,Value> expectedSample = new TreeMap<>();
        TreeMap<Key,Value> expectedAll = new TreeMap<>();
@@@ -710,7 -700,7 +710,7 @@@
        config1.set(entry.getKey(), entry.getValue());
      }
  
-     InMemoryMap imm = new InMemoryMap(config1, getServerContext());
 -    InMemoryMap imm = new InMemoryMap(config1, "--TEST--");
++    InMemoryMap imm = new InMemoryMap(config1, getServerContext(), Table.ID.of("--TEST--"));
  
      TreeMap<Key,Value> expectedSample = new TreeMap<>();
      TreeMap<Key,Value> expectedAll = new TreeMap<>();
@@@ -770,7 -760,7 +770,7 @@@
        config1.set(entry.getKey(), entry.getValue());
      }
  
-     InMemoryMap imm = new InMemoryMap(config1, getServerContext());
 -    InMemoryMap imm = new InMemoryMap(config1, "--TEST--");
++    InMemoryMap imm = new InMemoryMap(config1, getServerContext(), Table.ID.of("--TEST--"));
  
      mutate(imm, "r", "cf:cq", 5, "b");
  
@@@ -815,7 -805,7 +815,7 @@@
        config1.set(entry.getKey(), entry.getValue());
      }
  
-     InMemoryMap imm = new InMemoryMap(config1, getServerContext());
 -    InMemoryMap imm = new InMemoryMap(config1, "--TEST--");
++    InMemoryMap imm = new InMemoryMap(config1, getServerContext(), Table.ID.of("--TEST--"));
  
      // change sampler config after creating in mem map.
      SamplerConfigurationImpl sampleConfig2 = new SamplerConfigurationImpl(
diff --cc test/src/main/java/org/apache/accumulo/test/InMemoryMapIT.java
index 801e60a,e90ad3e..6c1b640
--- a/test/src/main/java/org/apache/accumulo/test/InMemoryMapIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/InMemoryMapIT.java
@@@ -31,9 -32,8 +31,11 @@@ import java.util.Map
  import java.util.Map.Entry;
  import java.util.Set;
  
++import org.apache.accumulo.core.clientImpl.Table;
++import org.apache.accumulo.core.clientImpl.Table.ID;
  import org.apache.accumulo.core.conf.ConfigurationCopy;
  import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.crypto.CryptoServiceFactory;
  import org.apache.accumulo.core.data.ArrayByteSequence;
  import org.apache.accumulo.core.data.ByteSequence;
  import org.apache.accumulo.core.data.Key;
@@@ -243,15 -236,14 +245,18 @@@ public class InMemoryMapIT 
        localityGroupNativeConfig.put(Property.TSERV_MEMDUMP_DIR.getKey(),
            tempFolder.newFolder().getAbsolutePath());
  
-       defaultMap = new InMemoryMap(new ConfigurationCopy(defaultMapConfig), getServerContext());
-       nativeMapWrapper = new InMemoryMap(new ConfigurationCopy(nativeMapConfig),
-           getServerContext());
 -      defaultMap = new InMemoryMap(new ConfigurationCopy(defaultMapConfig), "--TEST--");
 -      nativeMapWrapper = new InMemoryMap(new ConfigurationCopy(nativeMapConfig), "--TEST--");
++      ID testId = Table.ID.of("TEST");
++
++      defaultMap = new InMemoryMap(new ConfigurationCopy(defaultMapConfig), getServerContext(),
++          testId);
++      nativeMapWrapper = new InMemoryMap(new ConfigurationCopy(nativeMapConfig), getServerContext(),
++          testId);
        localityGroupMap = new InMemoryMap(
            updateConfigurationForLocalityGroups(new ConfigurationCopy(localityGroupConfig)),
-           getServerContext());
 -          "--TEST--");
++          getServerContext(), testId);
        localityGroupMapWithNative = new InMemoryMap(
            updateConfigurationForLocalityGroups(new ConfigurationCopy(localityGroupNativeConfig)),
-           getServerContext());
 -          "--TEST--");
++          getServerContext(), testId);
      } catch (Exception e) {
        log.error("Error getting new InMemoryMap ", e);
        fail(e.getMessage());
diff --cc test/src/main/java/org/apache/accumulo/test/functional/BadLocalityGroupMincIT.java
index 0000000,d20c578..8218f62
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BadLocalityGroupMincIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BadLocalityGroupMincIT.java
@@@ -1,0 -1,89 +1,91 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.test.functional;
+ 
+ import static java.nio.charset.StandardCharsets.UTF_8;
+ import static org.junit.Assert.assertEquals;
+ 
 -import java.util.HashMap;
 -import java.util.Map;
+ import java.util.Map.Entry;
+ 
++import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.BatchWriter;
+ import org.apache.accumulo.core.client.BatchWriterConfig;
 -import org.apache.accumulo.core.client.Connector;
+ import org.apache.accumulo.core.client.Scanner;
 -import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.security.Authorizations;
+ import org.apache.accumulo.harness.AccumuloClusterHarness;
+ import org.apache.hadoop.io.Text;
+ import org.junit.Test;
+ 
+ import com.google.common.collect.Iterables;
+ 
+ public class BadLocalityGroupMincIT extends AccumuloClusterHarness {
+ 
+   @Override
+   protected int defaultTimeoutSeconds() {
+     return 60;
+   }
+ 
+   @Test
+   public void test() throws Exception {
 -    Connector c = getConnector();
++    try (AccumuloClient c = createAccumuloClient()) {
+ 
 -    String tableName = getUniqueNames(1)[0];
 -    Map<String,String> props = new HashMap<>();
++      String tableName = getUniqueNames(1)[0];
+ 
 -    // intentionally bad locality group config where two groups share a family
 -    props.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "g1", "fam1,fam2");
 -    props.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "g2", "fam2,fam3");
 -    props.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "g1,g2");
++      c.tableOperations().create(tableName);
+ 
 -    c.tableOperations().create(tableName, new NewTableConfiguration().setProperties(props));
++      // intentionally bad locality group config where two groups share a family
++      c.tableOperations().setProperty(tableName,
++          Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "g1", "fam1,fam2");
++      c.tableOperations().setProperty(tableName,
++          Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "g2", "fam2,fam3");
++      c.tableOperations().setProperty(tableName, Property.TABLE_LOCALITY_GROUPS.getKey(), "g1,g2");
+ 
 -    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 -    Mutation m = new Mutation(new Text("r1"));
 -    m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8)));
++      c.tableOperations().offline(tableName, true);
++      c.tableOperations().online(tableName, true);
+ 
 -    bw.addMutation(m);
 -    bw.close();
++      BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
++      Mutation m = new Mutation(new Text("r1"));
++      m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8)));
+ 
 -    FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 0, 0);
++      bw.addMutation(m);
++      bw.close();
+ 
 -    // even with bad locality group config, the minor compaction should still work
 -    c.tableOperations().flush(tableName, null, null, true);
++      FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 0, 0);
+ 
 -    FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 1, 1);
++      // even with bad locality group config, the minor compaction should still work
++      c.tableOperations().flush(tableName, null, null, true);
+ 
 -    Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
 -    Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
++      FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 1, 1);
+ 
 -    assertEquals("r1", entry.getKey().getRowData().toString());
 -    assertEquals("acf", entry.getKey().getColumnFamilyData().toString());
 -    assertEquals(tableName, entry.getKey().getColumnQualifierData().toString());
 -    assertEquals("1", entry.getValue().toString());
++      Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
++      Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
+ 
 -    // this should not hang
 -    c.tableOperations().delete(tableName);
++      assertEquals("r1", entry.getKey().getRowData().toString());
++      assertEquals("acf", entry.getKey().getColumnFamilyData().toString());
++      assertEquals(tableName, entry.getKey().getColumnQualifierData().toString());
++      assertEquals("1", entry.getValue().toString());
++
++      // this should not hang
++      c.tableOperations().delete(tableName);
++    }
+   }
+ 
+ }
diff --cc test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
index 0000000,ed97c07..b6c5eb0
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java
@@@ -1,0 -1,265 +1,254 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.test.functional;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.fail;
+ 
+ import java.io.IOException;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.Set;
+ import java.util.SortedMap;
+ import java.util.TreeMap;
+ 
+ import org.apache.accumulo.core.Constants;
 -import org.apache.accumulo.core.client.AccumuloException;
 -import org.apache.accumulo.core.client.AccumuloSecurityException;
 -import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.Scanner;
+ import org.apache.accumulo.core.client.TableNotFoundException;
+ import org.apache.accumulo.core.client.admin.CompactionConfig;
 -import org.apache.accumulo.core.client.impl.ClientContext;
 -import org.apache.accumulo.core.client.impl.TabletLocator;
 -import org.apache.accumulo.core.client.impl.Translator;
 -import org.apache.accumulo.core.client.impl.Translators;
+ import org.apache.accumulo.core.client.rfile.RFile;
+ import org.apache.accumulo.core.client.rfile.RFileWriter;
++import org.apache.accumulo.core.clientImpl.ClientContext;
++import org.apache.accumulo.core.clientImpl.Table;
++import org.apache.accumulo.core.clientImpl.TabletLocator;
++import org.apache.accumulo.core.clientImpl.Translator;
++import org.apache.accumulo.core.clientImpl.Translators;
+ import org.apache.accumulo.core.conf.Property;
 -import org.apache.accumulo.core.conf.SiteConfiguration;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Value;
 -import org.apache.accumulo.core.data.impl.KeyExtent;
 -import org.apache.accumulo.core.data.thrift.MapFileInfo;
 -import org.apache.accumulo.core.data.thrift.TKeyExtent;
++import org.apache.accumulo.core.dataImpl.KeyExtent;
++import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
++import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+ import org.apache.accumulo.core.metadata.MetadataTable;
+ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
+ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+ import org.apache.accumulo.core.rpc.ThriftUtil;
+ import org.apache.accumulo.core.security.Authorizations;
+ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+ import org.apache.accumulo.core.trace.Tracer;
+ import org.apache.accumulo.core.util.HostAndPort;
+ import org.apache.accumulo.harness.AccumuloClusterHarness;
 -import org.apache.accumulo.master.tableOps.BulkImport;
 -import org.apache.accumulo.server.AccumuloServerContext;
 -import org.apache.accumulo.server.client.HdfsZooInstance;
 -import org.apache.accumulo.server.conf.ServerConfigurationFactory;
++import org.apache.accumulo.master.tableOps.bulkVer1.BulkImport;
++import org.apache.accumulo.server.ServerContext;
+ import org.apache.accumulo.server.fs.VolumeManager;
 -import org.apache.accumulo.server.fs.VolumeManagerImpl;
+ import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.Text;
+ import org.apache.thrift.TApplicationException;
+ import org.apache.thrift.TServiceClient;
+ import org.junit.Test;
+ 
+ import com.google.common.collect.ImmutableMap;
+ import com.google.common.collect.ImmutableSet;
+ 
+ public class BulkFailureIT extends AccumuloClusterHarness {
+ 
+   /**
+    * This test verifies two things. First it ensures that after a bulk imported file is compacted
+    * that import request are ignored. Second it ensures that after the bulk import transaction is
+    * canceled that import request fail. The public API for bulk import can not be used for this
+    * test. Internal (non public API) RPCs and Zookeeper state is manipulated directly. This is the
+    * only way to interleave compactions with multiple, duplicate import RPC request.
+    */
+   @Test
+   public void testImportCompactionImport() throws Exception {
 -    Connector c = getConnector();
 -    String table = getUniqueNames(1)[0];
++    try (AccumuloClient c = createAccumuloClient()) {
++      String table = getUniqueNames(1)[0];
+ 
 -    SortedMap<Key,Value> testData = createTestData();
++      SortedMap<Key,Value> testData = createTestData();
+ 
 -    FileSystem fs = getCluster().getFileSystem();
 -    String testFile = createTestFile(testData, fs);
++      FileSystem fs = getCluster().getFileSystem();
++      String testFile = createTestFile(testData, fs);
+ 
 -    c.tableOperations().create(table);
 -    String tableId = c.tableOperations().tableIdMap().get(table);
++      c.tableOperations().create(table);
++      String tableId = c.tableOperations().tableIdMap().get(table);
+ 
 -    // Table has no splits, so this extent corresponds to the tables single tablet
 -    KeyExtent extent = new KeyExtent(tableId.toString(), null, null);
++      // Table has no splits, so this extent corresponds to the tables single tablet
++      KeyExtent extent = new KeyExtent(Table.ID.of(tableId), null, null);
+ 
 -    // Set up site configuration because this test uses server side code that expects it.
 -    setupSiteConfig();
++      long fateTxid = 99999999L;
+ 
 -    long fateTxid = 99999999L;
++      ServerContext asCtx = getServerContext();
++      ZooArbitrator.start(asCtx, Constants.BULK_ARBITRATOR_TYPE, fateTxid);
+ 
 -    AccumuloServerContext asCtx = new AccumuloServerContext(
 -        new ServerConfigurationFactory(HdfsZooInstance.getInstance()));
 -    ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, fateTxid);
++      VolumeManager vm = asCtx.getVolumeManager();
+ 
 -    VolumeManager vm = VolumeManagerImpl.get();
++      // move the file into a directory for the table and rename the file to something unique
++      String bulkDir = BulkImport.prepareBulkImport(asCtx, vm, testFile, Table.ID.of(tableId));
+ 
 -    // move the file into a directory for the table and rename the file to something unique
 -    String bulkDir = BulkImport.prepareBulkImport(asCtx, vm, testFile, tableId);
++      // determine the files new name and path
++      FileStatus status = fs.listStatus(new Path(bulkDir))[0];
++      Path bulkLoadPath = fs.makeQualified(status.getPath());
+ 
 -    // determine the files new name and path
 -    FileStatus status = fs.listStatus(new Path(bulkDir))[0];
 -    Path bulkLoadPath = fs.makeQualified(status.getPath());
++      // Directly ask the tablet to load the file.
++      assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
+ 
 -    // Directly ask the tablet to load the file.
 -    assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
++      assertEquals(ImmutableSet.of(bulkLoadPath), getFiles(c, extent));
++      assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
++      assertEquals(testData, readTable(table, c));
+ 
 -    assertEquals(ImmutableSet.of(bulkLoadPath), getFiles(c, extent));
 -    assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
 -    assertEquals(testData, readTable(table, c));
++      // Compact the bulk imported file. Subsequent request to load the file should be ignored.
++      c.tableOperations().compact(table, new CompactionConfig().setWait(true));
+ 
 -    // Compact the bulk imported file. Subsequent request to load the file should be ignored.
 -    c.tableOperations().compact(table, new CompactionConfig().setWait(true));
++      Set<Path> tabletFiles = getFiles(c, extent);
++      assertFalse(tabletFiles.contains(bulkLoadPath));
++      assertEquals(1, tabletFiles.size());
++      assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
++      assertEquals(testData, readTable(table, c));
+ 
 -    Set<Path> tabletFiles = getFiles(c, extent);
 -    assertFalse(tabletFiles.contains(bulkLoadPath));
 -    assertEquals(1, tabletFiles.size());
 -    assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
 -    assertEquals(testData, readTable(table, c));
++      // this request should be ignored by the tablet
++      assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
+ 
 -    // this request should be ignored by the tablet
 -    assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
++      assertEquals(tabletFiles, getFiles(c, extent));
++      assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
++      assertEquals(testData, readTable(table, c));
+ 
 -    assertEquals(tabletFiles, getFiles(c, extent));
 -    assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
 -    assertEquals(testData, readTable(table, c));
++      // this is done to ensure the tablet reads the load flags from the metadata table when it
++      // loads
++      c.tableOperations().offline(table, true);
++      c.tableOperations().online(table, true);
+ 
 -    // this is done to ensure the tablet reads the load flags from the metadata table when it loads
 -    c.tableOperations().offline(table, true);
 -    c.tableOperations().online(table, true);
++      // this request should be ignored by the tablet
++      assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
+ 
 -    // this request should be ignored by the tablet
 -    assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
++      assertEquals(tabletFiles, getFiles(c, extent));
++      assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
++      assertEquals(testData, readTable(table, c));
+ 
 -    assertEquals(tabletFiles, getFiles(c, extent));
 -    assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
 -    assertEquals(testData, readTable(table, c));
++      // After this, all load request should fail.
++      ZooArbitrator.stop(asCtx, Constants.BULK_ARBITRATOR_TYPE, fateTxid);
+ 
 -    // After this, all load request should fail.
 -    ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, fateTxid);
++      try {
++        // expect this to fail
++        assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
++        fail();
++      } catch (TApplicationException tae) {
+ 
 -    try {
 -      // expect this to fail
 -      assignMapFiles(fateTxid, asCtx, extent, bulkLoadPath.toString(), status.getLen());
 -      fail();
 -    } catch (TApplicationException tae) {
++      }
+ 
++      assertEquals(tabletFiles, getFiles(c, extent));
++      assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
++      assertEquals(testData, readTable(table, c));
+     }
 -
 -    assertEquals(tabletFiles, getFiles(c, extent));
 -    assertEquals(ImmutableSet.of(bulkLoadPath), getLoaded(c, extent));
 -    assertEquals(testData, readTable(table, c));
+   }
+ 
+   private SortedMap<Key,Value> createTestData() {
+     SortedMap<Key,Value> testData = new TreeMap<>();
+     testData.put(new Key("r001", "f002", "q009", 56), new Value("v001"));
+     testData.put(new Key("r001", "f002", "q019", 56), new Value("v002"));
+     testData.put(new Key("r002", "f002", "q009", 57), new Value("v003"));
+     testData.put(new Key("r002", "f002", "q019", 57), new Value("v004"));
+     return testData;
+   }
+ 
+   private String createTestFile(SortedMap<Key,Value> testData, FileSystem fs) throws IOException {
+     Path base = new Path(getCluster().getTemporaryPath(), "testBulk_ICI");
+ 
+     fs.delete(base, true);
+     fs.mkdirs(base);
+     Path files = new Path(base, "files");
+ 
+     try (RFileWriter writer = RFile.newWriter().to(new Path(files, "ici_01.rf").toString())
+         .withFileSystem(fs).build()) {
+       writer.append(testData.entrySet());
+     }
+ 
+     String filesStr = fs.makeQualified(files).toString();
+     return filesStr;
+   }
+ 
 -  private SortedMap<Key,Value> readTable(String table, Connector connector)
++  private SortedMap<Key,Value> readTable(String table, AccumuloClient connector)
+       throws TableNotFoundException {
+     Scanner scanner = connector.createScanner(table, Authorizations.EMPTY);
+ 
+     SortedMap<Key,Value> actual = new TreeMap<>();
+ 
+     for (Entry<Key,Value> entry : scanner) {
+       actual.put(entry.getKey(), entry.getValue());
+     }
+ 
+     return actual;
+   }
+ 
 -  public Set<Path> getLoaded(Connector connector, KeyExtent extent) throws TableNotFoundException {
++  public Set<Path> getLoaded(AccumuloClient connector, KeyExtent extent)
++      throws TableNotFoundException {
+     return getPaths(connector, extent, BulkFileColumnFamily.NAME);
+   }
+ 
 -  public Set<Path> getFiles(Connector connector, KeyExtent extent) throws TableNotFoundException {
++  public Set<Path> getFiles(AccumuloClient connector, KeyExtent extent)
++      throws TableNotFoundException {
+     return getPaths(connector, extent, DataFileColumnFamily.NAME);
+   }
+ 
 -  private Set<Path> getPaths(Connector connector, KeyExtent extent, Text fam)
++  private Set<Path> getPaths(AccumuloClient connector, KeyExtent extent, Text fam)
+       throws TableNotFoundException {
+     HashSet<Path> files = new HashSet<>();
+ 
+     Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+     scanner.setRange(extent.toMetadataRange());
+     scanner.fetchColumnFamily(fam);
+ 
+     for (Entry<Key,Value> entry : scanner) {
+       files.add(new Path(entry.getKey().getColumnQualifierData().toString()));
+     }
+ 
+     return files;
+   }
+ 
+   private List<KeyExtent> assignMapFiles(long txid, ClientContext context, KeyExtent extent,
+       String path, long size) throws Exception {
+ 
+     TabletLocator locator = TabletLocator.getLocator(context, extent.getTableId());
+ 
+     locator.invalidateCache(extent);
+ 
+     HostAndPort location = HostAndPort
+         .fromString(locator.locateTablet(context, new Text(""), false, true).tablet_location);
+ 
+     long timeInMillis = context.getConfiguration().getTimeInMillis(Property.TSERV_BULK_TIMEOUT);
+     TabletClientService.Iface client = ThriftUtil.getTServerClient(location, context, timeInMillis);
+     try {
+ 
+       Map<String,MapFileInfo> val = ImmutableMap.of(path, new MapFileInfo(size));
+       Map<KeyExtent,Map<String,MapFileInfo>> files = ImmutableMap.of(extent, val);
+ 
+       List<TKeyExtent> failures = client.bulkImport(Tracer.traceInfo(), context.rpcCreds(), txid,
+           Translator.translate(files, Translators.KET), false);
+ 
+       return Translator.translate(failures, Translators.TKET);
+     } finally {
+       ThriftUtil.returnClient((TServiceClient) client);
+     }
+ 
+   }
 -
 -  private void setupSiteConfig() throws AccumuloException, AccumuloSecurityException {
 -    for (Entry<String,String> entry : getCluster().getSiteConfiguration()) {
 -      SiteConfiguration.getInstance().set(entry.getKey(), entry.getValue());
 -    }
 -  }
+ }
diff --cc test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
index e223346,c94a93f..8a60208
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
@@@ -128,19 -113,4 +128,17 @@@ public class BulkIT extends AccumuloClu
      vopts.rows = 1;
      VerifyIngest.verifyIngest(c, vopts, SOPTS);
    }
 +
 +  @SuppressWarnings("deprecation")
 +  private static void bulkLoad(AccumuloClient c, String tableName, Path bulkFailures, Path files,
 +      boolean useOld)
 +      throws TableNotFoundException, IOException, AccumuloException, AccumuloSecurityException {
 +    // Make sure the server can modify the files
 +    if (useOld) {
 +      c.tableOperations().importDirectory(tableName, files.toString(), bulkFailures.toString(),
 +          false);
 +    } else {
 +      c.tableOperations().importDirectory(files.toString()).to(tableName).load();
 +    }
- 
 +  }
- 
  }
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ManyWriteAheadLogsIT.java
index 0000000,e5b9065..3c66cd5
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ManyWriteAheadLogsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ManyWriteAheadLogsIT.java
@@@ -1,0 -1,181 +1,186 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.accumulo.test.functional;
+ 
+ import static org.junit.Assert.assertTrue;
+ 
++import java.util.Base64;
+ import java.util.HashSet;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.Random;
+ import java.util.Set;
+ import java.util.SortedSet;
+ import java.util.TreeSet;
+ 
++import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.BatchWriter;
+ import org.apache.accumulo.core.client.BatchWriterConfig;
 -import org.apache.accumulo.core.client.Connector;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.Mutation;
 -import org.apache.accumulo.core.util.Base64;
+ import org.apache.accumulo.harness.AccumuloClusterHarness;
 -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
++import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
++import org.apache.accumulo.server.ServerContext;
+ import org.apache.accumulo.server.log.WalStateManager.WalState;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.RawLocalFileSystem;
+ import org.apache.hadoop.io.Text;
+ import org.junit.Test;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class ManyWriteAheadLogsIT extends AccumuloClusterHarness {
+ 
+   private static final Logger log = LoggerFactory.getLogger(ManyWriteAheadLogsIT.class);
+ 
+   @Override
+   public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+     // configure a smaller walog size so the walogs will roll frequently in the test
+     cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+     cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
+     cfg.setProperty(Property.GC_CYCLE_START, "1");
+     cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s");
+     cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+     // idle compactions may addess the problem this test is creating, however they will not prevent
+     // lots of closed WALs for all write patterns. This test ensures code that directly handles many
+     // tablets referencing many different WALs is working.
+     cfg.setProperty(Property.TABLE_MINC_COMPACT_IDLETIME, "1h");
+     cfg.setNumTservers(1);
+     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+   }
+ 
+   @Override
+   protected int defaultTimeoutSeconds() {
+     return 10 * 60;
+   }
+ 
+   /**
+    * This creates a situation where many tablets reference many different write ahead logs. However
+    * not single tablet references a lot of write ahead logs. Want to ensure the tablet server forces
+    * minor compactions for this situation.
+    */
+   @Test
+   public void testMany() throws Exception {
+     SortedSet<Text> splits = new TreeSet<>();
+     for (int i = 1; i < 100; i++) {
+       splits.add(new Text(String.format("%05x", i * 100)));
+     }
+ 
 -    Connector c = getConnector();
 -    String[] tableNames = getUniqueNames(2);
++    ServerContext ctx = getServerContext();
+ 
 -    String manyWALsTable = tableNames[0];
 -    String rollWALsTable = tableNames[1];
 -    c.tableOperations().create(manyWALsTable);
 -    c.tableOperations().addSplits(manyWALsTable, splits);
++    try (AccumuloClient c = createAccumuloClient()) {
++      String[] tableNames = getUniqueNames(2);
+ 
 -    c.tableOperations().create(rollWALsTable);
++      String manyWALsTable = tableNames[0];
++      String rollWALsTable = tableNames[1];
++      c.tableOperations().create(manyWALsTable);
++      c.tableOperations().addSplits(manyWALsTable, splits);
+ 
 -    Random rand = new Random();
++      c.tableOperations().create(rollWALsTable);
+ 
 -    Set<String> allWalsSeen = new HashSet<>();
++      Random rand = new Random();
+ 
 -    addOpenWals(c, allWalsSeen);
++      Set<String> allWalsSeen = new HashSet<>();
+ 
 -    // This test creates the table manyWALsTable with a lot of tablets and writes a little bit to
 -    // each tablet. In between writing a little bit to each tablet a lot of data is written to
 -    // another table called rollWALsTable. Writing a lot causes the write ahead logs to roll. This
 -    // write pattern should cause the tablets in the manyWALsTable table to reference many closed
 -    // WALs. If nothing is done about all of these closed WALs, then it could cause a large burden
 -    // at recovery time.
++      addOpenWals(ctx, allWalsSeen);
+ 
 -    try (BatchWriter manyWALsWriter = c.createBatchWriter(manyWALsTable, new BatchWriterConfig());
 -        BatchWriter rollWALsWriter = c.createBatchWriter(rollWALsTable, new BatchWriterConfig())) {
++      // This test creates the table manyWALsTable with a lot of tablets and writes a little bit to
++      // each tablet. In between writing a little bit to each tablet a lot of data is written to
++      // another table called rollWALsTable. Writing a lot causes the write ahead logs to roll. This
++      // write pattern should cause the tablets in the manyWALsTable table to reference many closed
++      // WALs. If nothing is done about all of these closed WALs, then it could cause a large burden
++      // at recovery time.
+ 
 -      byte[] val = new byte[768];
++      try (BatchWriter manyWALsWriter = c.createBatchWriter(manyWALsTable, new BatchWriterConfig());
++          BatchWriter rollWALsWriter = c.createBatchWriter(rollWALsTable,
++              new BatchWriterConfig())) {
+ 
 -      for (int i = 0; i < 100; i++) {
 -        int startRow = i * 100;
++        byte[] val = new byte[768];
+ 
 -        // write a small amount of data to each tablet in the table
 -        for (int j = 0; j < 10; j++) {
 -          int row = startRow + j;
 -          Mutation m = new Mutation(String.format("%05x", row));
 -          rand.nextBytes(val);
 -          m.put("f", "q", "v");
++        for (int i = 0; i < 100; i++) {
++          int startRow = i * 100;
+ 
 -          manyWALsWriter.addMutation(m);
 -        }
 -        manyWALsWriter.flush();
++          // write a small amount of data to each tablet in the table
++          for (int j = 0; j < 10; j++) {
++            int row = startRow + j;
++            Mutation m = new Mutation(String.format("%05x", row));
++            rand.nextBytes(val);
++            m.put("f", "q", "v");
+ 
 -        // write a lot of data to second table to forces the logs to roll
 -        for (int j = 0; j < 1000; j++) {
 -          Mutation m = new Mutation(String.format("%03d", j));
 -          rand.nextBytes(val);
++            manyWALsWriter.addMutation(m);
++          }
++          manyWALsWriter.flush();
+ 
 -          m.put("f", "q", Base64.encodeBase64String(val));
++          // write a lot of data to second table to forces the logs to roll
++          for (int j = 0; j < 1000; j++) {
++            Mutation m = new Mutation(String.format("%03d", j));
++            rand.nextBytes(val);
+ 
 -          rollWALsWriter.addMutation(m);
 -        }
++            m.put("f", "q", Base64.getEncoder().encodeToString(val));
++
++            rollWALsWriter.addMutation(m);
++          }
+ 
 -        rollWALsWriter.flush();
++          rollWALsWriter.flush();
+ 
 -        // keep track of the open WALs as the test runs. Should see a lot of open WALs over the
 -        // lifetime of the test, but never a lot at any one time.
 -        addOpenWals(c, allWalsSeen);
++          // keep track of the open WALs as the test runs. Should see a lot of open WALs over the
++          // lifetime of the test, but never a lot at any one time.
++          addOpenWals(ctx, allWalsSeen);
++        }
+       }
 -    }
+ 
 -    assertTrue("Number of WALs seen was less than expected " + allWalsSeen.size(),
 -        allWalsSeen.size() >= 50);
++      assertTrue("Number of WALs seen was less than expected " + allWalsSeen.size(),
++          allWalsSeen.size() >= 50);
+ 
 -    // the total number of closed write ahead logs should get small
 -    int closedLogs = countClosedWals(c);
 -    while (closedLogs > 3) {
 -      log.debug("Waiting for wals to shrink " + closedLogs);
 -      Thread.sleep(250);
 -      closedLogs = countClosedWals(c);
++      // the total number of closed write ahead logs should get small
++      int closedLogs = countClosedWals(ctx);
++      while (closedLogs > 3) {
++        log.debug("Waiting for wals to shrink " + closedLogs);
++        Thread.sleep(250);
++        closedLogs = countClosedWals(ctx);
++      }
+     }
+   }
+ 
 -  private void addOpenWals(Connector c, Set<String> allWalsSeen) throws Exception {
++  private void addOpenWals(ServerContext c, Set<String> allWalsSeen) throws Exception {
+     Map<String,WalState> wals = WALSunnyDayIT._getWals(c);
+     Set<Entry<String,WalState>> es = wals.entrySet();
+     int open = 0;
+     for (Entry<String,WalState> entry : es) {
+       if (entry.getValue() == WalState.OPEN) {
+         open++;
+         allWalsSeen.add(entry.getKey());
+       }
+     }
+ 
+     assertTrue("Open WALs not in expected range " + open, open > 0 && open < 4);
+   }
+ 
 -  private int countClosedWals(Connector c) throws Exception {
++  private int countClosedWals(ServerContext c) throws Exception {
+     int count = 0;
+     Map<String,WalState> wals = WALSunnyDayIT._getWals(c);
+     for (WalState ws : wals.values()) {
+       if (ws == WalState.CLOSED) {
+         count++;
+       }
+     }
+ 
+     return count;
+   }
+ }
diff --cc test/src/main/java/org/apache/accumulo/test/functional/RowDeleteIT.java
index 70cec19,54a0faf..496ef15
--- a/test/src/main/java/org/apache/accumulo/test/functional/RowDeleteIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RowDeleteIT.java
@@@ -59,55 -59,50 +59,54 @@@ public class RowDeleteIT extends Accumu
  
    @Test
    public void run() throws Exception {
 -    Connector c = getConnector();
 -    String tableName = getUniqueNames(1)[0];
 -    c.tableOperations().create(tableName);
 -    Map<String,Set<Text>> groups = new HashMap<>();
 -    groups.put("lg1", Collections.singleton(new Text("foo")));
 -    c.tableOperations().setLocalityGroups(tableName, groups);
 -    IteratorSetting setting = new IteratorSetting(30, RowDeletingIterator.class);
 -    c.tableOperations().attachIterator(tableName, setting, EnumSet.of(IteratorScope.majc));
 -    c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "100");
 +    try (AccumuloClient c = createAccumuloClient()) {
 +      String tableName = getUniqueNames(1)[0];
 +      c.tableOperations().create(tableName);
 +      Map<String,Set<Text>> groups = new HashMap<>();
 +      groups.put("lg1", Collections.singleton(new Text("foo")));
-       groups.put("dg", Collections.emptySet());
 +      c.tableOperations().setLocalityGroups(tableName, groups);
 +      IteratorSetting setting = new IteratorSetting(30, RowDeletingIterator.class);
 +      c.tableOperations().attachIterator(tableName, setting, EnumSet.of(IteratorScope.majc));
 +      c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "100");
  
 -    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +      BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
  
 -    bw.addMutation(nm("r1", "foo", "cf1", "v1"));
 -    bw.addMutation(nm("r1", "bar", "cf1", "v2"));
 +      bw.addMutation(nm("r1", "foo", "cf1", "v1"));
 +      bw.addMutation(nm("r1", "bar", "cf1", "v2"));
  
 -    bw.flush();
 -    c.tableOperations().flush(tableName, null, null, true);
 +      bw.flush();
 +      c.tableOperations().flush(tableName, null, null, true);
  
 -    checkRFiles(c, tableName, 1, 1, 1, 1);
 +      checkRFiles(c, tableName, 1, 1, 1, 1);
  
 -    Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
 -    int count = Iterators.size(scanner.iterator());
 -    assertEquals("count == " + count, 2, count);
 +      int count;
 +      try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
 +        count = Iterators.size(scanner.iterator());
 +        assertEquals("count == " + count, 2, count);
  
 -    bw.addMutation(nm("r1", "", "", RowDeletingIterator.DELETE_ROW_VALUE));
 +        bw.addMutation(nm("r1", "", "", RowDeletingIterator.DELETE_ROW_VALUE));
  
 -    bw.flush();
 -    c.tableOperations().flush(tableName, null, null, true);
 +        bw.flush();
 +        c.tableOperations().flush(tableName, null, null, true);
  
 -    checkRFiles(c, tableName, 1, 1, 2, 2);
 +        checkRFiles(c, tableName, 1, 1, 2, 2);
 +      }
  
 -    scanner = c.createScanner(tableName, Authorizations.EMPTY);
 -    count = Iterators.size(scanner.iterator());
 -    assertEquals("count == " + count, 3, count);
 +      try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
 +        count = Iterators.size(scanner.iterator());
 +        assertEquals("count == " + count, 3, count);
  
 -    c.tableOperations().compact(tableName, null, null, false, true);
 +        c.tableOperations().compact(tableName, null, null, false, true);
  
 -    checkRFiles(c, tableName, 1, 1, 0, 0);
 -
 -    scanner = c.createScanner(tableName, Authorizations.EMPTY);
 -    count = Iterators.size(scanner.iterator());
 -    assertEquals("count == " + count, 0, count);
 -    bw.close();
 +        checkRFiles(c, tableName, 1, 1, 0, 0);
 +      }
  
 +      try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
 +        count = Iterators.size(scanner.iterator());
 +        assertEquals("count == " + count, 0, count);
 +        bw.close();
 +      }
 +    }
    }
  
  }
diff --cc test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index d882abb,702bce9..7fd76b8
--- a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@@ -52,12 -52,13 +52,13 @@@ import org.apache.accumulo.core.metadat
  import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
  import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
  import org.apache.accumulo.master.state.SetGoalState;
 -import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
 -import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterControl;
 +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.server.ServerContext;
  import org.apache.accumulo.server.log.WalStateManager;
+ import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
  import org.apache.accumulo.server.log.WalStateManager.WalState;
 -import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.fs.RawLocalFileSystem;
@@@ -95,82 -97,77 +97,80 @@@ public class WALSunnyDayIT extends Conf
      MiniAccumuloClusterImpl mac = getCluster();
      MiniAccumuloClusterControl control = mac.getClusterControl();
      control.stop(GARBAGE_COLLECTOR);
 -    Connector c = getConnector();
 -    String tableName = getUniqueNames(1)[0];
 -    c.tableOperations().create(tableName);
 -    writeSomeData(c, tableName, 1, 1);
 +    ServerContext context = getServerContext();
 +    try (AccumuloClient c = createClient()) {
 +      String tableName = getUniqueNames(1)[0];
 +      c.tableOperations().create(tableName);
 +      writeSomeData(c, tableName, 1, 1);
  
 -    // wal markers are added lazily
 -    Map<String,WalState> wals = getWALsAndAssertCount(c, 2);
 -    assertEquals("all WALs should be in use", 2, countInUse(wals.values()));
 +      // wal markers are added lazily
-       Map<String,Boolean> wals = getWALsAndAssertCount(context, 2);
-       for (Boolean b : wals.values()) {
-         assertTrue("logs should be in use", b);
-       }
++      Map<String,WalState> wals = getWALsAndAssertCount(context, 2);
++      assertEquals("all WALs should be in use", 2, countInUse(wals.values()));
  
 -    // roll log, get a new next
 -    writeSomeData(c, tableName, 1001, 50);
 -    Map<String,WalState> walsAfterRoll = getWALsAndAssertCount(c, 3);
 -    assertTrue("new WALs should be a superset of the old WALs",
 -        walsAfterRoll.keySet().containsAll(wals.keySet()));
 -    assertEquals("all WALs should be in use", 3, countInUse(walsAfterRoll.values()));
 +      // roll log, get a new next
 +      writeSomeData(c, tableName, 1001, 50);
-       Map<String,Boolean> walsAfterRoll = getWALsAndAssertCount(context, 3);
++      Map<String,WalState> walsAfterRoll = getWALsAndAssertCount(context, 3);
 +      assertTrue("new WALs should be a superset of the old WALs",
 +          walsAfterRoll.keySet().containsAll(wals.keySet()));
-       assertEquals("all WALs should be in use", 3, countTrue(walsAfterRoll.values()));
++      assertEquals("all WALs should be in use", 3, countInUse(walsAfterRoll.values()));
  
 -    // flush the tables
 -    for (String table : new String[] {tableName, MetadataTable.NAME, RootTable.NAME}) {
 -      c.tableOperations().flush(table, null, null, true);
 -    }
 -    sleepUninterruptibly(1, TimeUnit.SECONDS);
 -    // rolled WAL is no longer in use, but needs to be GC'd
 -    Map<String,WalState> walsAfterflush = getWALsAndAssertCount(c, 3);
 -    assertEquals("inUse should be 2", 2, countInUse(walsAfterflush.values()));
 +      // flush the tables
 +      for (String table : new String[] {tableName, MetadataTable.NAME, RootTable.NAME}) {
 +        c.tableOperations().flush(table, null, null, true);
 +      }
 +      sleepUninterruptibly(1, TimeUnit.SECONDS);
 +      // rolled WAL is no longer in use, but needs to be GC'd
-       Map<String,Boolean> walsAfterflush = getWALsAndAssertCount(context, 3);
-       assertEquals("inUse should be 2", 2, countTrue(walsAfterflush.values()));
++      Map<String,WalState> walsAfterflush = getWALsAndAssertCount(context, 3);
++      assertEquals("inUse should be 2", 2, countInUse(walsAfterflush.values()));
  
 -    // let the GC run for a little bit
 -    control.start(GARBAGE_COLLECTOR);
 -    sleepUninterruptibly(5, TimeUnit.SECONDS);
 -    // make sure the unused WAL goes away
 -    getWALsAndAssertCount(c, 2);
 -    control.stop(GARBAGE_COLLECTOR);
 -    // restart the tserver, but don't run recovery on all tablets
 -    control.stop(TABLET_SERVER);
 -    // this delays recovery on the normal tables
 -    assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor());
 -    control.start(TABLET_SERVER);
 +      // let the GC run for a little bit
 +      control.start(GARBAGE_COLLECTOR);
 +      sleepUninterruptibly(5, TimeUnit.SECONDS);
 +      // make sure the unused WAL goes away
 +      getWALsAndAssertCount(context, 2);
 +      control.stop(GARBAGE_COLLECTOR);
 +      // restart the tserver, but don't run recovery on all tablets
 +      control.stop(TABLET_SERVER);
 +      // this delays recovery on the normal tables
 +      assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor());
 +      control.start(TABLET_SERVER);
  
 -    // wait for the metadata table to go back online
 -    getRecoveryMarkers(c);
 -    // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER tablets
 -    sleepUninterruptibly(5, TimeUnit.SECONDS);
 -    Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c);
 -    // log.debug("markers " + markers);
 -    assertEquals("one tablet should have markers", 1, markers.keySet().size());
 -    assertEquals("tableId of the keyExtent should be 1", "1",
 -        markers.keySet().iterator().next().getTableId());
 +      // wait for the metadata table to go back online
 +      getRecoveryMarkers(c);
 +      // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER tablets
 +      sleepUninterruptibly(5, TimeUnit.SECONDS);
 +      Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c);
 +      // log.debug("markers " + markers);
 +      assertEquals("one tablet should have markers", 1, markers.keySet().size());
 +      assertEquals("tableId of the keyExtent should be 1", "1",
 +          markers.keySet().iterator().next().getTableId().canonicalID());
  
 -    // put some data in the WAL
 -    assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor());
 -    verifySomeData(c, tableName, 1001 * 50 + 1);
 -    writeSomeData(c, tableName, 100, 100);
 +      // put some data in the WAL
 +      assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor());
 +      verifySomeData(c, tableName, 1001 * 50 + 1);
 +      writeSomeData(c, tableName, 100, 100);
  
-       Map<String,Boolean> walsAfterRestart = getWALsAndAssertCount(context, 4);
 -    Map<String,WalState> walsAfterRestart = getWALsAndAssertCount(c, 4);
 -    // log.debug("wals after " + walsAfterRestart);
 -    assertEquals("used WALs after restart should be 4", 4, countInUse(walsAfterRestart.values()));
 -    control.start(GARBAGE_COLLECTOR);
 -    sleepUninterruptibly(5, TimeUnit.SECONDS);
 -    Map<String,WalState> walsAfterRestartAndGC = getWALsAndAssertCount(c, 2);
 -    assertEquals("logs in use should be 2", 2, countInUse(walsAfterRestartAndGC.values()));
++      Map<String,WalState> walsAfterRestart = getWALsAndAssertCount(context, 4);
 +      // log.debug("wals after " + walsAfterRestart);
-       assertEquals("used WALs after restart should be 4", 4, countTrue(walsAfterRestart.values()));
++      assertEquals("used WALs after restart should be 4", 4, countInUse(walsAfterRestart.values()));
 +      control.start(GARBAGE_COLLECTOR);
 +      sleepUninterruptibly(5, TimeUnit.SECONDS);
-       Map<String,Boolean> walsAfterRestartAndGC = getWALsAndAssertCount(context, 2);
-       assertEquals("logs in use should be 2", 2, countTrue(walsAfterRestartAndGC.values()));
++      Map<String,WalState> walsAfterRestartAndGC = getWALsAndAssertCount(context, 2);
++      assertEquals("logs in use should be 2", 2, countInUse(walsAfterRestartAndGC.values()));
 +    }
    }
  
 -  private void verifySomeData(Connector c, String tableName, int expected) throws Exception {
 -    Scanner scan = c.createScanner(tableName, EMPTY);
 -    int result = Iterators.size(scan.iterator());
 -    scan.close();
 -    assertEquals(expected, result);
 +  private void verifySomeData(AccumuloClient c, String tableName, int expected) throws Exception {
 +    try (Scanner scan = c.createScanner(tableName, EMPTY)) {
 +      int result = Iterators.size(scan.iterator());
 +      assertEquals(expected, result);
 +    }
    }
  
 -  private void writeSomeData(Connector conn, String tableName, int row, int col) throws Exception {
 -    Random rand = new Random();
 -    BatchWriter bw = conn.createBatchWriter(tableName, null);
 +  private void writeSomeData(AccumuloClient client, String tableName, int row, int col)
 +      throws Exception {
 +    Random rand = new SecureRandom();
 +    BatchWriter bw = client.createBatchWriter(tableName, null);
      byte[] rowData = new byte[10];
      byte[] cq = new byte[10];
      byte[] value = new byte[10];
@@@ -224,7 -220,7 +224,7 @@@
    private final int TIMES_TO_COUNT = 20;
    private final int PAUSE_BETWEEN_COUNTS = 100;
  
-   private Map<String,Boolean> getWALsAndAssertCount(ServerContext c, int expectedCount)
 -  private Map<String,WalState> getWALsAndAssertCount(Connector c, int expectedCount)
++  private Map<String,WalState> getWALsAndAssertCount(ServerContext c, int expectedCount)
        throws Exception {
      // see https://issues.apache.org/jira/browse/ACCUMULO-4110. Sometimes this test counts the logs
      // before
@@@ -262,14 -258,27 +262,24 @@@
      return waitLonger;
    }
  
-   private Map<String,Boolean> _getWals(ServerContext c) throws Exception {
-     Map<String,Boolean> result = new HashMap<>();
-     WalStateManager wals = new WalStateManager(c);
-     for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
-       // WALs are in use if they are not unreferenced
-       result.put(entry.getKey().toString(), entry.getValue() != WalState.UNREFERENCED);
 -  static Map<String,WalState> _getWals(Connector c) throws Exception {
++  static Map<String,WalState> _getWals(ServerContext c) throws Exception {
+     while (true) {
+       try {
+         Map<String,WalState> result = new HashMap<>();
 -        Instance i = c.getInstance();
 -        ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(),
 -            "");
 -        WalStateManager wals = new WalStateManager(c.getInstance(), zk);
++        WalStateManager wals = new WalStateManager(c);
+         for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+           // WALs are in use if they are not unreferenced
+           result.put(entry.getKey().toString(), entry.getValue());
+         }
+         return result;
+       } catch (WalMarkerException wme) {
+         if (wme.getCause() instanceof NoNodeException) {
+           log.debug("WALs changed while reading, retrying", wme);
+         } else {
+           throw wme;
+         }
+       }
      }
-     return result;
    }
  
  }


Mime
View raw message