accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [3/4] accumulo git commit: ACCUMULO-3452 Enables KRB authentication to proxy and impersonation.
Date Wed, 21 Jan 2015 23:36:52 GMT
ACCUMULO-3452 Enables KRB authentication to proxy and impersonation.

The first part introduces KRB authentication with the proxy for clients.
To correctly pass client credentials through to Accumulo, we need to
introduce some basic impersonation which is controlled via static
rules defined in the site configuration.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/98ced20a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/98ced20a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/98ced20a

Branch: refs/heads/master
Commit: 98ced20a3650bc31a6de4e2e6f79c6e1c7c121cb
Parents: 0a799e3
Author: Josh Elser <elserj@apache.org>
Authored: Tue Jan 20 12:22:32 2015 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Wed Jan 21 18:24:38 2015 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   2 +
 .../core/rpc/SaslConnectionParamsTest.java      |   2 +-
 .../java/org/apache/accumulo/proxy/Proxy.java   | 151 ++++++--
 .../org/apache/accumulo/proxy/ProxyServer.java  |  27 +-
 .../accumulo/server/metrics/MetricsFactory.java |   8 +-
 .../TCredentialsUpdatingInvocationHandler.java  |  32 +-
 .../server/rpc/TCredentialsUpdatingWrapper.java |   6 +-
 .../accumulo/server/rpc/TServerUtils.java       |  15 +-
 .../accumulo/server/rpc/ThriftServerType.java   |   4 +
 .../accumulo/server/rpc/TimedProcessor.java     |   6 +-
 .../server/rpc/UGIAssumingProcessor.java        |  10 +-
 .../server/security/UserImpersonation.java      | 223 ++++++++++++
 .../security/handler/KerberosAuthenticator.java |  27 +-
 ...redentialsUpdatingInvocationHandlerTest.java |  92 ++++-
 .../server/security/UserImpersonationTest.java  | 273 ++++++++++++++
 .../accumulo/gc/SimpleGarbageCollector.java     |   2 +-
 .../java/org/apache/accumulo/master/Master.java |   2 +-
 .../apache/accumulo/tserver/TabletServer.java   |   4 +-
 .../accumulo/proxy/ProxyDurabilityIT.java       |  11 +-
 .../apache/accumulo/proxy/SimpleProxyIT.java    |  20 +-
 .../proxy/TestProxyInstanceOperations.java      |   5 +-
 .../accumulo/proxy/TestProxyReadWrite.java      |   5 +-
 .../proxy/TestProxySecurityOperations.java      |   5 +-
 .../proxy/TestProxyTableOperations.java         |   5 +-
 .../test/functional/KerberosProxyIT.java        | 362 +++++++++++++++++++
 25 files changed, 1199 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index ad96680..68fac73 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -158,6 +158,8 @@ public enum Property {
    */
   INSTANCE_RPC_SASL_ENABLED("instance.rpc.sasl.enabled", "false", PropertyType.BOOLEAN,
       "Configures Thrift RPCs to require SASL with GSSAPI which supports Kerberos authentication. Mutually exclusive with SSL RPC configuration."),
+  INSTANCE_RPC_SASL_PROXYUSERS("instance.rpc.sasl.impersonation.", null, PropertyType.PREFIX,
+      "Prefix that allows configuration of users that are allowed to impersonate other users"),
 
   // general properties
   GENERAL_PREFIX("general.", null, PropertyType.PREFIX,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java b/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
index 673126c..1f80d50 100644
--- a/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
@@ -51,7 +51,7 @@ public class SaslConnectionParamsTest {
 
   @Test
   public void testNullParams() {
-    ClientConfiguration clientConf = ClientConfiguration.loadDefault();
+    ClientConfiguration clientConf = new ClientConfiguration();
     AccumuloConfiguration rpcConf = ClientContext.convertClientConfig(clientConf);
     assertEquals("false", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
     assertNull(SaslConnectionParams.forConfig(rpcConf));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index 81509ee..98dc3ee 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -20,32 +20,61 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.Constructor;
+import java.net.InetAddress;
 import java.util.Properties;
 
 import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.proxy.thrift.AccumuloProxy;
+import org.apache.accumulo.server.metrics.MetricsFactory;
 import org.apache.accumulo.server.rpc.RpcWrapper;
-import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.rpc.TimedProcessor;
+import org.apache.accumulo.server.rpc.UGIAssumingProcessor;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
 
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 import com.google.common.io.Files;
+import com.google.common.net.HostAndPort;
 
 public class Proxy {
 
   private static final Logger log = Logger.getLogger(Proxy.class);
 
+  public static final String USE_MINI_ACCUMULO_KEY = "useMiniAccumulo";
+  public static final String USE_MINI_ACCUMULO_DEFAULT = "false";
+  public static final String USE_MOCK_INSTANCE_KEY = "useMockInstance";
+  public static final String USE_MOCK_INSTANCE_DEFAULT = "false";
+  public static final String ACCUMULO_INSTANCE_NAME_KEY = "instance";
+  public static final String ZOOKEEPERS_KEY = "zookeepers";
+  public static final String THRIFT_THREAD_POOL_SIZE_KEY = "numThreads";
+  // Default number of threads from THsHaServer.Args
+  public static final String THRIFT_THREAD_POOL_SIZE_DEFAULT = "5";
+  public static final String THRIFT_MAX_FRAME_SIZE_KEY = "maxFrameSize";
+  public static final String THRIFT_MAX_FRAME_SIZE_DEFAULT = "16M";
+
+  // Type of thrift server to create
+  public static final String THRIFT_SERVER_TYPE = "thriftServerType";
+  public static final String THRIFT_SERVER_TYPE_DEFAULT = "";
+  public static final ThriftServerType DEFAULT_SERVER_TYPE = ThriftServerType.getDefault();
+
+  public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+  public static final String KERBEROS_KEYTAB = "kerberosKeytab";
+
   public static class PropertiesConverter implements IStringConverter<Properties> {
     @Override
     public Properties convert(String fileName) {
@@ -74,10 +103,10 @@ public class Proxy {
     Opts opts = new Opts();
     opts.parseArgs(Proxy.class.getName(), args);
 
-    boolean useMini = Boolean.parseBoolean(opts.prop.getProperty("useMiniAccumulo", "false"));
-    boolean useMock = Boolean.parseBoolean(opts.prop.getProperty("useMockInstance", "false"));
-    String instance = opts.prop.getProperty("instance");
-    String zookeepers = opts.prop.getProperty("zookeepers");
+    boolean useMini = Boolean.parseBoolean(opts.prop.getProperty(USE_MINI_ACCUMULO_KEY, USE_MINI_ACCUMULO_DEFAULT));
+    boolean useMock = Boolean.parseBoolean(opts.prop.getProperty(USE_MOCK_INSTANCE_KEY, USE_MOCK_INSTANCE_DEFAULT));
+    String instance = opts.prop.getProperty(ACCUMULO_INSTANCE_NAME_KEY);
+    String zookeepers = opts.prop.getProperty(ZOOKEEPERS_KEY);
 
     if (!useMini && !useMock && instance == null) {
       System.err.println("Properties file must contain one of : useMiniAccumulo=true, useMockInstance=true, or instance=<instance name>");
@@ -118,34 +147,94 @@ public class Proxy {
 
     Class<? extends TProtocolFactory> protoFactoryClass = Class.forName(opts.prop.getProperty("protocolFactory", TCompactProtocol.Factory.class.getName()))
         .asSubclass(TProtocolFactory.class);
+    TProtocolFactory protoFactory = protoFactoryClass.newInstance();
     int port = Integer.parseInt(opts.prop.getProperty("port"));
-    TServer server = createProxyServer(AccumuloProxy.class, ProxyServer.class, port, protoFactoryClass, opts.prop);
-    server.serve();
+    HostAndPort address = HostAndPort.fromParts(InetAddress.getLocalHost().getCanonicalHostName(), port);
+    ServerAddress server = createProxyServer(address, protoFactory, opts.prop);
+    // Wait for the server to come up
+    while (!server.server.isServing()) {
+      Thread.sleep(100);
+    }
+    log.info("Proxy server started on " + server.getAddress());
+    while (server.server.isServing()) {
+      Thread.sleep(1000);
+    }
   }
 
-  public static TServer createProxyServer(Class<?> api, Class<?> implementor, final int port, Class<? extends TProtocolFactory> protoClass,
-      Properties properties) throws Exception {
-    final TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
+  public static ServerAddress createProxyServer(HostAndPort address, TProtocolFactory protocolFactory, Properties properties) throws Exception {
+    final int numThreads = Integer.parseInt(properties.getProperty(THRIFT_THREAD_POOL_SIZE_KEY, THRIFT_THREAD_POOL_SIZE_DEFAULT));
+    final long maxFrameSize = AccumuloConfiguration.getMemoryInBytes(properties.getProperty(THRIFT_MAX_FRAME_SIZE_KEY, THRIFT_MAX_FRAME_SIZE_DEFAULT));
+    final int simpleTimerThreadpoolSize = Integer.parseInt(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE.getDefaultValue());
+    // How frequently to try to resize the thread pool
+    final long threadpoolResizeInterval = 1000l * 5;
+    // No timeout
+    final long serverSocketTimeout = 0l;
+    // Use the new hadoop metrics2 support
+    final MetricsFactory metricsFactory = new MetricsFactory(false);
+    final String serverName = "Proxy", threadName = "Accumulo Thrift Proxy";
+
+    // create the implementation of the proxy interface
+    ProxyServer impl = new ProxyServer(properties);
+
+    // Wrap the implementation -- translate some exceptions
+    AccumuloProxy.Iface wrappedImpl = RpcWrapper.service(impl);
+
+    // Create the processor from the implementation
+    TProcessor processor = new AccumuloProxy.Processor<AccumuloProxy.Iface>(wrappedImpl);
+
+    // Get the type of thrift server to instantiate
+    final String serverTypeStr = properties.getProperty(THRIFT_SERVER_TYPE, THRIFT_SERVER_TYPE_DEFAULT);
+    ThriftServerType serverType = DEFAULT_SERVER_TYPE;
+    if (!THRIFT_SERVER_TYPE_DEFAULT.equals(serverTypeStr)) {
+      serverType = ThriftServerType.get(serverTypeStr);
+    }
 
-    // create the implementor
-    Object impl = implementor.getConstructor(Properties.class).newInstance(properties);
+    ClientConfiguration clientConf = ClientConfiguration.loadDefault();
+    SslConnectionParams sslParams = null;
+    SaslConnectionParams saslParams = null;
+    switch (serverType) {
+      case SSL:
+        sslParams = SslConnectionParams.forClient(ClientContext.convertClientConfig(clientConf));
+        break;
+      case SASL:
+        saslParams = SaslConnectionParams.forConfig(clientConf);
+        if (null == saslParams) {
+          log.fatal("SASL thrift server was requested but it is disabled in client configuration");
+          throw new RuntimeException();
+        }
 
-    Class<?> proxyProcClass = Class.forName(api.getName() + "$Processor");
-    Class<?> proxyIfaceClass = Class.forName(api.getName() + "$Iface");
+        // Kerberos needs to be enabled to use it
+        if (!UserGroupInformation.isSecurityEnabled()) {
+          log.fatal("Hadoop security is not enabled");
+          throw new RuntimeException();
+        }
+
+        // Login via principal and keytab
+        final String kerberosPrincipal = properties.getProperty(KERBEROS_PRINCIPAL, ""),
+        kerberosKeytab = properties.getProperty(KERBEROS_KEYTAB, "");
+        if (StringUtils.isBlank(kerberosPrincipal) || StringUtils.isBlank(kerberosKeytab)) {
+          log.fatal("Kerberos principal and keytab must be provided");
+          throw new RuntimeException();
+        }
+        UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytab);
+        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+        log.info("Logged in as " + ugi.getUserName());
+
+        processor = new UGIAssumingProcessor(processor);
+
+        break;
+      default:
+        // nothing to do -- no extra configuration necessary
+        break;
+    }
 
-    @SuppressWarnings("unchecked")
-    Constructor<? extends TProcessor> proxyProcConstructor = (Constructor<? extends TProcessor>) proxyProcClass.getConstructor(proxyIfaceClass);
+    // Hook up support for tracing for thrift calls
+    TimedProcessor timedProcessor = new TimedProcessor(metricsFactory, processor, serverName, threadName);
 
-    final TProcessor processor = proxyProcConstructor.newInstance(TCredentialsUpdatingWrapper.service(RpcWrapper.service(impl), impl.getClass()));
+    // Create the thrift server with our processor and properties
+    ServerAddress serverAddr = TServerUtils.startTServer(address, serverType, timedProcessor, serverName, threadName, numThreads, simpleTimerThreadpoolSize,
+        threadpoolResizeInterval, maxFrameSize, sslParams, saslParams, serverSocketTimeout);
 
-    THsHaServer.Args args = new THsHaServer.Args(socket);
-    args.processor(processor);
-    final long maxFrameSize = AccumuloConfiguration.getMemoryInBytes(properties.getProperty("maxFrameSize", "16M"));
-    if (maxFrameSize > Integer.MAX_VALUE)
-      throw new RuntimeException(maxFrameSize + " is larger than MAX_INT");
-    args.transportFactory(new TFramedTransport.Factory((int) maxFrameSize));
-    args.protocolFactory(protoClass.newInstance());
-    args.maxReadBufferBytes = maxFrameSize;
-    return new THsHaServer(args);
+    return serverAddr;
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
index 5d460b5..8499044 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
@@ -104,6 +104,8 @@ import org.apache.accumulo.proxy.thrift.ScanType;
 import org.apache.accumulo.proxy.thrift.UnknownScanner;
 import org.apache.accumulo.proxy.thrift.UnknownWriter;
 import org.apache.accumulo.proxy.thrift.WriterOptions;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.rpc.UGIAssumingProcessor;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
@@ -175,6 +177,8 @@ public class ProxyServer implements AccumuloProxy.Iface {
   protected Cache<UUID,BatchWriterPlusException> writerCache;
   protected Cache<UUID,ConditionalWriter> conditionalWriterCache;
 
+  private final ThriftServerType serverType;
+
   public ProxyServer(Properties props) {
 
     String useMock = props.getProperty("useMockInstance");
@@ -191,6 +195,14 @@ public class ProxyServer implements AccumuloProxy.Iface {
       throw new RuntimeException(e);
     }
 
+    final String serverTypeStr = props.getProperty(Proxy.THRIFT_SERVER_TYPE, Proxy.THRIFT_SERVER_TYPE_DEFAULT);
+    ThriftServerType tempServerType = Proxy.DEFAULT_SERVER_TYPE;
+    if (!Proxy.THRIFT_SERVER_TYPE_DEFAULT.equals(serverTypeStr)) {
+      tempServerType = ThriftServerType.get(serverTypeStr);
+    }
+
+    serverType = tempServerType;
+
     scannerCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).maximumSize(1000).removalListener(new CloseScanner()).build();
 
     writerCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).maximumSize(1000).removalListener(new CloseWriter()).build();
@@ -1526,14 +1538,24 @@ public class ProxyServer implements AccumuloProxy.Iface {
 
   @Override
   public ByteBuffer login(String principal, Map<String,String> loginProperties) throws org.apache.accumulo.proxy.thrift.AccumuloSecurityException, TException {
+    if (ThriftServerType.SASL == serverType) {
+      String remoteUser = UGIAssumingProcessor.rpcPrincipal();
+      if (null == remoteUser || !remoteUser.equals(principal)) {
+        logger.error("Denying login from user " + remoteUser + " who attempted to log in as " + principal);
+        throw new org.apache.accumulo.proxy.thrift.AccumuloSecurityException("RPC principal did not match requested Accumulo principal");
+      }
+    }
+
     try {
       AuthenticationToken token = getToken(principal, loginProperties);
       ByteBuffer login = ByteBuffer.wrap((instance.getInstanceID() + "," + new Credentials(principal, token).serialize()).getBytes(UTF_8));
       getConnector(login); // check to make sure user exists
       return login;
     } catch (AccumuloSecurityException e) {
+      logger.error("Failed to login", e);
       throw new org.apache.accumulo.proxy.thrift.AccumuloSecurityException(e.toString());
     } catch (Exception e) {
+      logger.error("Failed to login", e);
       throw new TException(e);
     }
   }
@@ -1544,9 +1566,8 @@ public class ProxyServer implements AccumuloProxy.Iface {
     AuthenticationToken token;
     try {
       token = tokenClass.newInstance();
-    } catch (InstantiationException e) {
-      throw new AccumuloException(e);
-    } catch (IllegalAccessException e) {
+    } catch (InstantiationException | IllegalAccessException e) {
+      logger.error("Error constructing authentication token", e);
       throw new AccumuloException(e);
     }
     token.init(props);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsFactory.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsFactory.java
index ce2ae31..aadf830 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsFactory.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsFactory.java
@@ -20,8 +20,6 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.hadoop.metrics2.MetricsSystem;
 
-import com.google.common.base.Preconditions;
-
 /**
  *
  */
@@ -31,9 +29,11 @@ public class MetricsFactory {
   private final MetricsSystem metricsSystem;
 
   public MetricsFactory(AccumuloConfiguration conf) {
-    Preconditions.checkNotNull(conf);
-    useOldMetrics = conf.getBoolean(Property.GENERAL_LEGACY_METRICS);
+    this(conf.getBoolean(Property.GENERAL_LEGACY_METRICS));
+  }
 
+  public MetricsFactory(boolean useOldMetrics) {
+    this.useOldMetrics = useOldMetrics;
     if (useOldMetrics) {
       metricsSystem = null;
     } else {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
index f8400e2..f85505d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
@@ -25,9 +25,11 @@ import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
-import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
+import org.apache.accumulo.server.security.UserImpersonation;
+import org.apache.accumulo.server.security.UserImpersonation.UsersWithHosts;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,9 +42,11 @@ public class TCredentialsUpdatingInvocationHandler<I> implements InvocationHandl
 
   private static final ConcurrentHashMap<String,Class<? extends AuthenticationToken>> TOKEN_CLASS_CACHE = new ConcurrentHashMap<>();
   private final I instance;
+  private final UserImpersonation impersonation;
 
-  protected TCredentialsUpdatingInvocationHandler(final I serverInstance) {
+  protected TCredentialsUpdatingInvocationHandler(final I serverInstance, AccumuloConfiguration conf) {
     instance = serverInstance;
+    impersonation = new UserImpersonation(conf);
   }
 
   @Override
@@ -85,7 +89,7 @@ public class TCredentialsUpdatingInvocationHandler<I> implements InvocationHandl
     }
 
     // The Accumulo principal extracted from the SASL transport
-    final String principal = UGIAssumingProcessor.currentPrincipal();
+    final String principal = UGIAssumingProcessor.rpcPrincipal();
 
     if (null == principal) {
       log.debug("Found KerberosToken in TCredentials, but did not receive principal from SASL processor");
@@ -94,12 +98,28 @@ public class TCredentialsUpdatingInvocationHandler<I> implements InvocationHandl
 
     // The principal from the SASL transport should match what the user requested as their Accumulo principal
     if (!principal.equals(tcreds.principal)) {
-      final String msg = "Principal in credentials object should match kerberos principal. Expected '" + principal + "' but was '" + tcreds.principal + "'";
-      log.warn(msg);
-      throw new ThriftSecurityException(msg, SecurityErrorCode.BAD_CREDENTIALS);
+      UsersWithHosts usersWithHosts = impersonation.get(principal);
+      if (null == usersWithHosts) {
+        principalMismatch(principal, tcreds.principal);
+      }
+      if (!usersWithHosts.getUsers().contains(tcreds.principal)) {
+        principalMismatch(principal, tcreds.principal);
+      }
+      String clientAddr = TServerUtils.clientAddress.get();
+      if (!usersWithHosts.getHosts().contains(clientAddr)) {
+        final String msg = "Principal in credentials object allowed mismatched Kerberos principals, but not on " + clientAddr;
+        log.warn(msg);
+        throw new ThriftSecurityException(msg, SecurityErrorCode.BAD_CREDENTIALS);
+      }
     }
   }
 
+  protected void principalMismatch(String expected, String actual) throws ThriftSecurityException {
+    final String msg = "Principal in credentials object should match kerberos principal. Expected '" + expected + "' but was '" + actual + "'";
+    log.warn(msg);
+    throw new ThriftSecurityException(msg, SecurityErrorCode.BAD_CREDENTIALS);
+  }
+
   protected Class<? extends AuthenticationToken> getTokenClassFromName(String tokenClassName) {
     Class<? extends AuthenticationToken> typedClz = TOKEN_CLASS_CACHE.get(tokenClassName);
     if (null == typedClz) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java
index 4621d36..698cf9e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingWrapper.java
@@ -19,6 +19,8 @@ package org.apache.accumulo.server.rpc;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+
 /**
  * Utility method to ensure that the instance of TCredentials which is passed to the implementation of a Thrift service has the correct principal from SASL at
  * the Thrift transport layer when SASL/GSSAPI (kerberos) is enabled. This ensures that we use the strong authentication provided to us and disallow any other
@@ -26,8 +28,8 @@ import java.lang.reflect.Proxy;
  */
 public class TCredentialsUpdatingWrapper {
 
-  public static <T> T service(final T instance, final Class<? extends T> originalClass) {
-    InvocationHandler handler = new TCredentialsUpdatingInvocationHandler<T>(instance);
+  public static <T> T service(final T instance, final Class<? extends T> originalClass, AccumuloConfiguration conf) {
+    InvocationHandler handler = new TCredentialsUpdatingInvocationHandler<T>(instance, conf);
 
     @SuppressWarnings("unchecked")
     T proxiedInstance = (T) Proxy.newProxyInstance(originalClass.getClassLoader(), originalClass.getInterfaces(), handler);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 59ae01f..6a1adda 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -43,7 +43,6 @@ import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.hadoop.security.SaslRpcServer;
@@ -149,9 +148,8 @@ public class TServerUtils {
           port = 1024 + port % (65535 - 1024);
         try {
           HostAndPort addr = HostAndPort.fromParts(hostname, port);
-          return TServerUtils.startTServer(addr, serverType, timedProcessor, serverName, threadName, minThreads,
-              simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize,
-              service.getServerSslParams(), service.getServerSaslParams(), service.getClientTimeoutInMillis());
+          return TServerUtils.startTServer(addr, serverType, timedProcessor, serverName, threadName, minThreads, simpleTimerThreadpoolSize,
+              timeBetweenThreadChecks, maxMessageSize, service.getServerSslParams(), service.getServerSaslParams(), service.getClientTimeoutInMillis());
         } catch (TTransportException ex) {
           log.error("Unable to start TServer", ex);
           if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
@@ -390,8 +388,7 @@ public class TServerUtils {
     }
 
     return new ServerAddress(new TThreadPoolServer(new TThreadPoolServer.Args(transport).transportFactory(ugiTransportFactory)
-        .processorFactory(clientInfoFactory)
-        .protocolFactory(ThriftUtil.protocolFactory())), address);
+        .processorFactory(clientInfoFactory).protocolFactory(ThriftUtil.protocolFactory())), address);
   }
 
   public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor,
@@ -411,9 +408,9 @@ public class TServerUtils {
    *
    * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
    */
-  public static ServerAddress startTServer(HostAndPort address,ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName, int numThreads,
-      int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize,  SslConnectionParams sslParams,
-      SaslConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+  public static ServerAddress startTServer(HostAndPort address, ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName,
+      int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslConnectionParams saslParams,
+      long serverSocketTimeout) throws TTransportException {
 
     // This is presently not supported. It's hypothetically possible, I believe, to work, but it would require changes in how the transports
     // work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's quality of protection addresses privacy issues.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
index 60d5402..6c0bcf9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
@@ -46,4 +46,8 @@ public enum ThriftServerType {
   public String toString() {
     return name;
   }
+
+  public static ThriftServerType getDefault() {
+    return CUSTOM_HS_HA;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
index a842572..2c788c6 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
@@ -37,9 +37,11 @@ public class TimedProcessor implements TProcessor {
   private long idleStart = 0;
 
   public TimedProcessor(AccumuloConfiguration conf, TProcessor next, String serverName, String threadName) {
+    this(new MetricsFactory(conf), next, serverName, threadName);
+  }
+
+  public TimedProcessor(MetricsFactory factory, TProcessor next, String serverName, String threadName) {
     this.other = next;
-    // Register the metrics MBean
-    MetricsFactory factory = new MetricsFactory(conf);
     metrics = factory.createThriftMetrics(serverName, threadName);
     try {
       metrics.register();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
index d5787a3..ab106a6 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 public class UGIAssumingProcessor implements TProcessor {
   private static final Logger log = LoggerFactory.getLogger(UGIAssumingProcessor.class);
 
-  public static final ThreadLocal<String> principal = new ThreadLocal<String>();
+  public static final ThreadLocal<String> rpcPrincipal = new ThreadLocal<String>();
   private final TProcessor wrapped;
   private final UserGroupInformation loginUser;
 
@@ -56,8 +56,8 @@ public class UGIAssumingProcessor implements TProcessor {
   /**
    * The principal of the user who authenticated over SASL.
    */
-  public static String currentPrincipal() {
-    return principal.get();
+  public static String rpcPrincipal() {
+    return rpcPrincipal.get();
   }
 
   @Override
@@ -78,13 +78,13 @@ public class UGIAssumingProcessor implements TProcessor {
 
     try {
       // Set the principal in the ThreadLocal for access to get authorizations
-      principal.set(remoteUser);
+      rpcPrincipal.set(remoteUser);
 
       return wrapped.process(inProt, outProt);
     } finally {
       // Unset the principal after we're done using it just to be sure that it's not incorrectly
       // used in the same thread down the line.
-      principal.set(null);
+      rpcPrincipal.set(null);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/base/src/main/java/org/apache/accumulo/server/security/UserImpersonation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/UserImpersonation.java b/server/base/src/main/java/org/apache/accumulo/server/security/UserImpersonation.java
new file mode 100644
index 0000000..b634f0e
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/UserImpersonation.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class UserImpersonation {
+
+  private static final Logger log = LoggerFactory.getLogger(UserImpersonation.class);
+  private static final Set<String> ALWAYS_TRUE = new AlwaysTrueSet<>();
+  private static final String ALL = "*", USERS = "users", HOSTS = "hosts";
+
+  public static class AlwaysTrueSet<T> implements Set<T> {
+
+    @Override
+    public int size() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isEmpty() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean contains(Object o) {
+      return true;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Object[] toArray() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <E> E[] toArray(E[] a) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean add(T e) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean remove(Object o) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+      return true;
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends T> c) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void clear() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public static class UsersWithHosts {
+    private Set<String> users = new HashSet<>(), hosts = new HashSet<>();
+    private boolean allUsers, allHosts;
+
+    public UsersWithHosts() {
+      allUsers = allHosts = false;
+    }
+
+    public UsersWithHosts(Set<String> users, Set<String> hosts) {
+      this();
+      this.users = users;
+      this.hosts = hosts;
+    }
+
+    public Set<String> getUsers() {
+      if (allUsers) {
+        return ALWAYS_TRUE;
+      }
+      return users;
+    }
+
+    public Set<String> getHosts() {
+      if (allHosts) {
+        return ALWAYS_TRUE;
+      }
+      return hosts;
+    }
+
+    public boolean acceptsAllUsers() {
+      return allUsers;
+    }
+
+    public void setAcceptAllUsers(boolean allUsers) {
+      this.allUsers = allUsers;
+    }
+
+    public boolean acceptsAllHosts() {
+      return allHosts;
+    }
+
+    public void setAcceptAllHosts(boolean allHosts) {
+      this.allHosts = allHosts;
+    }
+
+    public void setUsers(Set<String> users) {
+      this.users = users;
+      allUsers = false;
+    }
+
+    public void setHosts(Set<String> hosts) {
+      this.hosts = hosts;
+      allHosts = false;
+    }
+  }
+
+  private final Map<String,UsersWithHosts> proxyUsers;
+
+  public UserImpersonation(AccumuloConfiguration conf) {
+    Map<String,String> entries = conf.getAllPropertiesWithPrefix(Property.INSTANCE_RPC_SASL_PROXYUSERS);
+    proxyUsers = new HashMap<>();
+    final String configKey = Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey();
+    for (Entry<String,String> entry : entries.entrySet()) {
+      String aclKey = entry.getKey().substring(configKey.length());
+      int index = aclKey.lastIndexOf('.');
+
+      if (-1 == index) {
+        throw new RuntimeException("Expected 2 elements in key suffix: " + aclKey);
+      }
+
+      final String remoteUser = aclKey.substring(0, index).trim(), usersOrHosts = aclKey.substring(index + 1).trim();
+      UsersWithHosts usersWithHosts = proxyUsers.get(remoteUser);
+      if (null == usersWithHosts) {
+        usersWithHosts = new UsersWithHosts();
+        proxyUsers.put(remoteUser, usersWithHosts);
+      }
+
+      if (USERS.equals(usersOrHosts)) {
+        String userString = entry.getValue().trim();
+        if (ALL.equals(userString)) {
+          usersWithHosts.setAcceptAllUsers(true);
+        } else if (!usersWithHosts.acceptsAllUsers()) {
+          Set<String> users = usersWithHosts.getUsers();
+          if (null == users) {
+            users = new HashSet<>();
+            usersWithHosts.setUsers(users);
+          }
+          String[] userValues = StringUtils.split(userString, ',');
+          users.addAll(Arrays.<String> asList(userValues));
+        }
+      } else if (HOSTS.equals(usersOrHosts)) {
+        String hostsString = entry.getValue().trim();
+        if (ALL.equals(hostsString)) {
+          usersWithHosts.setAcceptAllHosts(true);
+        } else if (!usersWithHosts.acceptsAllHosts()) {
+          Set<String> hosts = usersWithHosts.getHosts();
+          if (null == hosts) {
+            hosts = new HashSet<>();
+            usersWithHosts.setHosts(hosts);
+          }
+          String[] hostValues = StringUtils.split(hostsString, ',');
+          hosts.addAll(Arrays.<String> asList(hostValues));
+        }
+      } else {
+        log.debug("Ignoring key " + aclKey);
+      }
+    }
+  }
+
+  public UsersWithHosts get(String remoteUser) {
+    return proxyUsers.get(remoteUser);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
index 61b8db0..08fa55b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
@@ -28,13 +28,17 @@ import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.Base64;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.rpc.UGIAssumingProcessor;
 import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
-import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
+import org.apache.accumulo.server.security.UserImpersonation;
+import org.apache.accumulo.server.security.UserImpersonation.UsersWithHosts;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.zookeeper.KeeperException;
@@ -53,9 +57,15 @@ public class KerberosAuthenticator implements Authenticator {
   private final ZKAuthenticator zkAuthenticator = new ZKAuthenticator();
   private String zkUserPath;
   private final ZooCache zooCache;
+  private final UserImpersonation impersonation;
 
   public KerberosAuthenticator() {
-    zooCache = new ZooCache();
+    this(new ZooCache(), SiteConfiguration.getInstance());
+  }
+
+  public KerberosAuthenticator(ZooCache cache, AccumuloConfiguration conf) {
+    this.zooCache = cache;
+    this.impersonation = new UserImpersonation(conf);
   }
 
   @Override
@@ -104,11 +114,20 @@ public class KerberosAuthenticator implements Authenticator {
 
   @Override
   public boolean authenticateUser(String principal, AuthenticationToken token) throws AccumuloSecurityException {
-    final String rpcPrincipal = UGIAssumingProcessor.currentPrincipal();
+    final String rpcPrincipal = UGIAssumingProcessor.rpcPrincipal();
 
     if (!rpcPrincipal.equals(principal)) {
       // KerberosAuthenticator can't do perform this because KerberosToken is just a shim and doesn't contain the actual credentials
-      throw new AccumuloSecurityException(principal, SecurityErrorCode.AUTHENTICATOR_FAILED);
+      // Double check that the rpc user can impersonate as the requested user.
+      UsersWithHosts usersWithHosts = impersonation.get(rpcPrincipal);
+      if (null == usersWithHosts) {
+        throw new AccumuloSecurityException(principal, SecurityErrorCode.AUTHENTICATOR_FAILED);
+      }
+      if (!usersWithHosts.getUsers().contains(principal)) {
+        throw new AccumuloSecurityException(principal, SecurityErrorCode.AUTHENTICATOR_FAILED);
+      }
+
+      log.debug("Allowing impersonation of {} by {}", principal, rpcPrincipal);
     }
 
     // User is authenticated at the transport layer -- nothing extra is necessary

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java
index aba1aa0..c2d182e 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java
@@ -24,8 +24,9 @@ import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -34,15 +35,17 @@ import org.junit.Test;
 public class TCredentialsUpdatingInvocationHandlerTest {
 
   TCredentialsUpdatingInvocationHandler<Object> proxy;
+  ConfigurationCopy conf;
 
   @Before
   public void setup() {
-    proxy = new TCredentialsUpdatingInvocationHandler<Object>(new Object());
+    conf = new ConfigurationCopy();
+    proxy = new TCredentialsUpdatingInvocationHandler<Object>(new Object(), conf);
   }
 
   @After
   public void teardown() {
-    UGIAssumingProcessor.principal.set(null);
+    UGIAssumingProcessor.rpcPrincipal.set(null);
   }
 
   @Test
@@ -61,7 +64,7 @@ public class TCredentialsUpdatingInvocationHandlerTest {
     ConcurrentHashMap<String,Class<? extends AuthenticationToken>> cache = proxy.getTokenCache();
     cache.clear();
     TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString());
-    UGIAssumingProcessor.principal.set(principal);
+    UGIAssumingProcessor.rpcPrincipal.set(principal);
     proxy.updateArgs(new Object[] {new Object(), tcreds});
     Assert.assertEquals(1, cache.size());
     Assert.assertEquals(KerberosToken.class, cache.get(KerberosToken.CLASS_NAME));
@@ -71,7 +74,7 @@ public class TCredentialsUpdatingInvocationHandlerTest {
   public void testMissingPrincipal() throws Exception {
     final String principal = "root";
     TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString());
-    UGIAssumingProcessor.principal.set(null);
+    UGIAssumingProcessor.rpcPrincipal.set(null);
     proxy.updateArgs(new Object[] {new Object(), tcreds});
   }
 
@@ -79,7 +82,7 @@ public class TCredentialsUpdatingInvocationHandlerTest {
   public void testMismatchedPrincipal() throws Exception {
     final String principal = "root";
     TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString());
-    UGIAssumingProcessor.principal.set(principal + "foobar");
+    UGIAssumingProcessor.rpcPrincipal.set(principal + "foobar");
     proxy.updateArgs(new Object[] {new Object(), tcreds});
   }
 
@@ -87,7 +90,82 @@ public class TCredentialsUpdatingInvocationHandlerTest {
   public void testWrongTokenType() throws Exception {
     final String principal = "root";
     TCredentials tcreds = new TCredentials(principal, PasswordToken.class.getName(), ByteBuffer.allocate(0), UUID.randomUUID().toString());
-    UGIAssumingProcessor.principal.set(principal);
+    UGIAssumingProcessor.rpcPrincipal.set(principal);
+    proxy.updateArgs(new Object[] {new Object(), tcreds});
+  }
+
+  @Test
+  public void testAllowedAnyImpersonationForAnyUser() throws Exception {
+    final String proxyServer = "proxy";
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + proxyServer + ".users", "*");
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + proxyServer + ".hosts", "*");
+    proxy = new TCredentialsUpdatingInvocationHandler<Object>(new Object(), conf);
+    TCredentials tcreds = new TCredentials("client", KerberosToken.class.getName(), ByteBuffer.allocate(0), UUID.randomUUID().toString());
+    UGIAssumingProcessor.rpcPrincipal.set(proxyServer);
+    proxy.updateArgs(new Object[] {new Object(), tcreds});
+  }
+
+  @Test
+  public void testAllowedImpersonationForSpecificUsers() throws Exception {
+    final String proxyServer = "proxy";
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + proxyServer + ".users", "client1,client2");
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + proxyServer + ".hosts", "*");
+    proxy = new TCredentialsUpdatingInvocationHandler<Object>(new Object(), conf);
+    TCredentials tcreds = new TCredentials("client1", KerberosToken.class.getName(), ByteBuffer.allocate(0), UUID.randomUUID().toString());
+    UGIAssumingProcessor.rpcPrincipal.set(proxyServer);
+    proxy.updateArgs(new Object[] {new Object(), tcreds});
+    tcreds = new TCredentials("client2", KerberosToken.class.getName(), ByteBuffer.allocate(0), UUID.randomUUID().toString());
+    proxy.updateArgs(new Object[] {new Object(), tcreds});
+  }
+
+  @Test(expected = ThriftSecurityException.class)
+  public void testDisallowedImpersonationForUser() throws Exception {
+    final String proxyServer = "proxy";
+    // let "otherproxy" impersonate, but not "proxy"
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + "otherproxy" + ".users", "*");
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + "otherproxy" + ".hosts", "*");
+    proxy = new TCredentialsUpdatingInvocationHandler<Object>(new Object(), conf);
+    TCredentials tcreds = new TCredentials("client", KerberosToken.class.getName(), ByteBuffer.allocate(0), UUID.randomUUID().toString());
+    UGIAssumingProcessor.rpcPrincipal.set(proxyServer);
+    proxy.updateArgs(new Object[] {new Object(), tcreds});
+  }
+
+  @Test(expected = ThriftSecurityException.class)
+  public void testDisallowedImpersonationForMultipleUsers() throws Exception {
+    final String proxyServer = "proxy";
+    // let "otherproxy" impersonate, but not "proxy"
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + "otherproxy1" + ".users", "*");
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + "otherproxy1" + ".hosts", "*");
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + "otherproxy2" + ".users", "client1,client2");
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + "otherproxy2" + ".hosts", "*");
+    proxy = new TCredentialsUpdatingInvocationHandler<Object>(new Object(), conf);
+    TCredentials tcreds = new TCredentials("client1", KerberosToken.class.getName(), ByteBuffer.allocate(0), UUID.randomUUID().toString());
+    UGIAssumingProcessor.rpcPrincipal.set(proxyServer);
+    proxy.updateArgs(new Object[] {new Object(), tcreds});
+  }
+
+  @Test
+  public void testAllowedImpersonationFromSpecificHost() throws Exception {
+    final String proxyServer = "proxy", client = "client", host = "host.domain.com";
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + proxyServer + ".users", client);
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + proxyServer + ".hosts", host);
+    proxy = new TCredentialsUpdatingInvocationHandler<Object>(new Object(), conf);
+    TCredentials tcreds = new TCredentials("client", KerberosToken.class.getName(), ByteBuffer.allocate(0), UUID.randomUUID().toString());
+    UGIAssumingProcessor.rpcPrincipal.set(proxyServer);
+    TServerUtils.clientAddress.set(host);
+    proxy.updateArgs(new Object[] {new Object(), tcreds});
+  }
+
+  @Test(expected = ThriftSecurityException.class)
+  public void testDisallowedImpersonationFromSpecificHost() throws Exception {
+    final String proxyServer = "proxy", client = "client", host = "host.domain.com";
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + proxyServer + ".users", client);
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + proxyServer + ".hosts", host);
+    proxy = new TCredentialsUpdatingInvocationHandler<Object>(new Object(), conf);
+    TCredentials tcreds = new TCredentials("client", KerberosToken.class.getName(), ByteBuffer.allocate(0), UUID.randomUUID().toString());
+    UGIAssumingProcessor.rpcPrincipal.set(proxyServer);
+    // The RPC came from a different host than is allowed
+    TServerUtils.clientAddress.set("otherhost.domain.com");
     proxy.updateArgs(new Object[] {new Object(), tcreds});
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/base/src/test/java/org/apache/accumulo/server/security/UserImpersonationTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/UserImpersonationTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/UserImpersonationTest.java
new file mode 100644
index 0000000..341073b
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/security/UserImpersonationTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.security.UserImpersonation.AlwaysTrueSet;
+import org.apache.accumulo.server.security.UserImpersonation.UsersWithHosts;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+
+/**
+ *
+ */
+public class UserImpersonationTest {
+
+  private ConfigurationCopy conf;
+
+  @Before
+  public void setup() {
+    conf = new ConfigurationCopy(new HashMap<String,String>());
+  }
+
+  void setValidHosts(String user, String hosts) {
+    setUsersOrHosts(user, ".hosts", hosts);
+  }
+
+  void setValidUsers(String user, String users) {
+    setUsersOrHosts(user, ".users", users);
+  }
+
+  void setUsersOrHosts(String user, String suffix, String value) {
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + user + suffix, value);
+  }
+
+  @Test
+  public void testAnyUserAndHosts() {
+    String server = "server";
+    setValidHosts(server, "*");
+    setValidUsers(server, "*");
+    UserImpersonation impersonation = new UserImpersonation(conf);
+
+    UsersWithHosts uwh = impersonation.get(server);
+    assertNotNull(uwh);
+
+    assertTrue(uwh.acceptsAllHosts());
+    assertTrue(uwh.acceptsAllUsers());
+
+    assertEquals(AlwaysTrueSet.class, uwh.getHosts().getClass());
+    assertEquals(AlwaysTrueSet.class, uwh.getUsers().getClass());
+  }
+
+  @Test
+  public void testNoHostByDefault() {
+    String server = "server";
+    setValidUsers(server, "*");
+    UserImpersonation impersonation = new UserImpersonation(conf);
+
+    UsersWithHosts uwh = impersonation.get(server);
+    assertNotNull(uwh);
+
+    assertFalse(uwh.acceptsAllHosts());
+    assertTrue(uwh.acceptsAllUsers());
+
+    assertNotEquals(AlwaysTrueSet.class, uwh.getHosts().getClass());
+    assertEquals(AlwaysTrueSet.class, uwh.getUsers().getClass());
+  }
+
+  @Test
+  public void testNoUsersByDefault() {
+    String server = "server";
+    setValidHosts(server, "*");
+    UserImpersonation impersonation = new UserImpersonation(conf);
+
+    UsersWithHosts uwh = impersonation.get(server);
+    assertNotNull(uwh);
+
+    assertTrue(uwh.acceptsAllHosts());
+    assertFalse(uwh.acceptsAllUsers());
+
+    assertEquals(AlwaysTrueSet.class, uwh.getHosts().getClass());
+    assertNotEquals(AlwaysTrueSet.class, uwh.getUsers().getClass());
+  }
+
+  @Test
+  public void testSingleUserAndHost() {
+    String server = "server", host = "single_host.domain.com", client = "single_client";
+    setValidHosts(server, host);
+    setValidUsers(server, client);
+    UserImpersonation impersonation = new UserImpersonation(conf);
+
+    UsersWithHosts uwh = impersonation.get(server);
+    assertNotNull(uwh);
+
+    assertFalse(uwh.acceptsAllHosts());
+    assertFalse(uwh.acceptsAllUsers());
+
+    assertNotEquals(AlwaysTrueSet.class, uwh.getHosts().getClass());
+    assertNotEquals(AlwaysTrueSet.class, uwh.getUsers().getClass());
+
+    assertTrue(uwh.getUsers().contains(client));
+    assertTrue(uwh.getHosts().contains(host));
+
+    assertFalse(uwh.getUsers().contains("some_other_user"));
+    assertFalse(uwh.getHosts().contains("other_host.domain.com"));
+  }
+
+  @Test
+  public void testMultipleExplicitUsers() {
+    String server = "server", client1 = "client1", client2 = "client2", client3 = "client3";
+    setValidHosts(server, "*");
+    setValidUsers(server, Joiner.on(',').join(client1, client2, client3));
+    UserImpersonation impersonation = new UserImpersonation(conf);
+
+    UsersWithHosts uwh = impersonation.get(server);
+    assertNotNull(uwh);
+
+    assertTrue(uwh.acceptsAllHosts());
+    assertFalse(uwh.acceptsAllUsers());
+
+    assertEquals(AlwaysTrueSet.class, uwh.getHosts().getClass());
+    assertNotEquals(AlwaysTrueSet.class, uwh.getUsers().getClass());
+
+    assertTrue(uwh.getUsers().contains(client1));
+    assertTrue(uwh.getUsers().contains(client2));
+    assertTrue(uwh.getUsers().contains(client3));
+    assertFalse(uwh.getUsers().contains("other_client"));
+  }
+
+  @Test
+  public void testMultipleExplicitHosts() {
+    String server = "server", host1 = "host1", host2 = "host2", host3 = "host3";
+    setValidHosts(server, Joiner.on(',').join(host1, host2, host3));
+    setValidUsers(server, "*");
+    UserImpersonation impersonation = new UserImpersonation(conf);
+
+    UsersWithHosts uwh = impersonation.get(server);
+    assertNotNull(uwh);
+
+    assertFalse(uwh.acceptsAllHosts());
+    assertTrue(uwh.acceptsAllUsers());
+
+    assertNotEquals(AlwaysTrueSet.class, uwh.getHosts().getClass());
+    assertEquals(AlwaysTrueSet.class, uwh.getUsers().getClass());
+
+    assertTrue(uwh.getHosts().contains(host1));
+    assertTrue(uwh.getHosts().contains(host2));
+    assertTrue(uwh.getHosts().contains(host3));
+    assertFalse(uwh.getHosts().contains("other_host"));
+  }
+
+  @Test
+  public void testMultipleExplicitUsersHosts() {
+    String server = "server", host1 = "host1", host2 = "host2", host3 = "host3", client1 = "client1", client2 = "client2", client3 = "client3";
+    setValidHosts(server, Joiner.on(',').join(host1, host2, host3));
+    setValidUsers(server, Joiner.on(',').join(client1, client2, client3));
+    UserImpersonation impersonation = new UserImpersonation(conf);
+
+    UsersWithHosts uwh = impersonation.get(server);
+    assertNotNull(uwh);
+
+    assertFalse(uwh.acceptsAllHosts());
+    assertFalse(uwh.acceptsAllUsers());
+
+    assertNotEquals(AlwaysTrueSet.class, uwh.getHosts().getClass());
+    assertNotEquals(AlwaysTrueSet.class, uwh.getUsers().getClass());
+
+    assertTrue(uwh.getUsers().contains(client1));
+    assertTrue(uwh.getUsers().contains(client2));
+    assertTrue(uwh.getUsers().contains(client3));
+    assertFalse(uwh.getUsers().contains("other_client"));
+
+    assertTrue(uwh.getHosts().contains(host1));
+    assertTrue(uwh.getHosts().contains(host2));
+    assertTrue(uwh.getHosts().contains(host3));
+    assertFalse(uwh.getHosts().contains("other_host"));
+  }
+
+  @Test
+  public void testMultipleAllowedImpersonators() {
+    String server1 = "server1", server2 = "server2", host1 = "host1", host2 = "host2", host3 = "host3", client1 = "client1", client2 = "client2", client3 = "client3";
+    // server1 can impersonate client1 and client2 from host1 or host2
+    setValidHosts(server1, Joiner.on(',').join(host1, host2));
+    setValidUsers(server1, Joiner.on(',').join(client1, client2));
+    // server2 can impersonate only client3 from host3
+    setValidHosts(server2, host3);
+    setValidUsers(server2, client3);
+    UserImpersonation impersonation = new UserImpersonation(conf);
+
+    UsersWithHosts uwh = impersonation.get(server1);
+    assertNotNull(uwh);
+
+    assertFalse(uwh.acceptsAllHosts());
+    assertFalse(uwh.acceptsAllUsers());
+
+    assertNotEquals(AlwaysTrueSet.class, uwh.getHosts().getClass());
+    assertNotEquals(AlwaysTrueSet.class, uwh.getUsers().getClass());
+
+    assertTrue(uwh.getUsers().contains(client1));
+    assertTrue(uwh.getUsers().contains(client2));
+    assertFalse(uwh.getUsers().contains(client3));
+    assertFalse(uwh.getUsers().contains("other_client"));
+
+    assertTrue(uwh.getHosts().contains(host1));
+    assertTrue(uwh.getHosts().contains(host2));
+    assertFalse(uwh.getHosts().contains(host3));
+    assertFalse(uwh.getHosts().contains("other_host"));
+
+    uwh = impersonation.get(server2);
+    assertNotNull(uwh);
+
+    assertFalse(uwh.acceptsAllHosts());
+    assertFalse(uwh.acceptsAllUsers());
+
+    assertNotEquals(AlwaysTrueSet.class, uwh.getHosts().getClass());
+    assertNotEquals(AlwaysTrueSet.class, uwh.getUsers().getClass());
+
+    assertFalse(uwh.getUsers().contains(client1));
+    assertFalse(uwh.getUsers().contains(client2));
+    assertTrue(uwh.getUsers().contains(client3));
+    assertFalse(uwh.getUsers().contains("other_client"));
+
+    assertFalse(uwh.getHosts().contains(host1));
+    assertFalse(uwh.getHosts().contains(host2));
+    assertTrue(uwh.getHosts().contains(host3));
+    assertFalse(uwh.getHosts().contains("other_host"));
+
+    // client3 is not allowed to impersonate anyone
+    assertNull(impersonation.get(client3));
+  }
+
+  @Test
+  public void testSingleUser() throws Exception {
+    final String server = "server/hostname@EXAMPLE.COM", client = "client@EXAMPLE.COM";
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + server + ".users", client);
+    conf.set(Property.INSTANCE_RPC_SASL_PROXYUSERS.getKey() + server + ".hosts", "*");
+    UserImpersonation impersonation = new UserImpersonation(conf);
+
+    UsersWithHosts uwh = impersonation.get(server);
+
+    assertNotNull(uwh);
+    
+    assertTrue(uwh.acceptsAllHosts());
+    assertFalse(uwh.acceptsAllUsers());
+    
+    assertTrue(uwh.getUsers().contains(client));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 7efabb6..da0b07c 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -712,7 +712,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
     Iface rpcProxy = RpcWrapper.service(this);
     final Processor<Iface> processor;
     if (ThriftServerType.SASL == getThriftServerType()) {
-      Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass());
+      Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration());
       processor = new Processor<Iface>(tcProxy);
     } else {
       processor = new Processor<Iface>(rpcProxy);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index a6ea6ea..fd0651d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -1095,7 +1095,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
     Iface rpcProxy = RpcWrapper.service(clientHandler);
     final Processor<Iface> processor;
     if (ThriftServerType.SASL == getThriftServerType()) {
-      Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass());
+      Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass(), getConfiguration());
       processor = new Processor<Iface>(tcredsProxy);
     } else {
       processor = new Processor<Iface>(rpcProxy);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 4ac00e9..7d49e65 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -2277,7 +2277,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     Iface rpcProxy = RpcWrapper.service(new ThriftClientHandler());
     final Processor<Iface> processor;
     if (ThriftServerType.SASL == getThriftServerType()) {
-      Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class);
+      Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class, getConfiguration());
       processor = new Processor<Iface>(tcredProxy);
     } else {
       processor = new Processor<Iface>(rpcProxy);
@@ -2291,7 +2291,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
   private HostAndPort startReplicationService() throws UnknownHostException {
     final ReplicationServicerHandler handler = new ReplicationServicerHandler(this);
     ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler);
-    ReplicationServicer.Iface repl = TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass());
+    ReplicationServicer.Iface repl = TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass(), getConfiguration());
     ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
     AccumuloConfiguration conf = getServerConfigurationFactory().getConfiguration();
     Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
index c632b47..bea39bf 100644
--- a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
+++ b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
@@ -49,10 +49,12 @@ import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.test.functional.FunctionalTestUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.thrift.protocol.TJSONProtocol.Factory;
+import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.server.TServer;
 import org.junit.Test;
 
+import com.google.common.net.HostAndPort;
+
 public class ProxyDurabilityIT extends ConfigurableMacIT {
 
   @Override
@@ -74,11 +76,10 @@ public class ProxyDurabilityIT extends ConfigurableMacIT {
     props.put("zookeepers", c.getInstance().getZooKeepers());
     props.put("tokenClass", PasswordToken.class.getName());
 
-    Class<Factory> protocolClass = org.apache.thrift.protocol.TJSONProtocol.Factory.class;
+    TJSONProtocol.Factory protocol = new TJSONProtocol.Factory();
 
     int proxyPort = PortUtils.getRandomFreePort();
-    final TServer proxyServer = Proxy.createProxyServer(org.apache.accumulo.proxy.thrift.AccumuloProxy.class, org.apache.accumulo.proxy.ProxyServer.class,
-        proxyPort, protocolClass, props);
+    final TServer proxyServer = Proxy.createProxyServer(HostAndPort.fromParts("localhost", proxyPort), protocol, props).server;
     Thread thread = new Thread() {
       @Override
       public void run() {
@@ -88,7 +89,7 @@ public class ProxyDurabilityIT extends ConfigurableMacIT {
     thread.start();
     while (!proxyServer.isServing())
       UtilWaitThread.sleep(100);
-    Client client = new TestProxyClient("localhost", proxyPort, protocolClass.newInstance()).proxy();
+    Client client = new TestProxyClient("localhost", proxyPort, protocol).proxy();
     Map<String,String> properties = new TreeMap<String,String>();
     properties.put("password", ROOT_PASSWORD);
     ByteBuffer login = client.login("root", properties);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java b/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java
index f12658a..df91d05 100644
--- a/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java
+++ b/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java
@@ -115,6 +115,8 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 import org.junit.rules.Timeout;
 
+import com.google.common.net.HostAndPort;
+
 /**
  * Call every method on the proxy and try to verify that it works.
  */
@@ -140,16 +142,17 @@ public class SimpleProxyIT {
   };
   private static ByteBuffer creds = null;
 
-  private static Class<? extends TProtocolFactory> protocolClass;
+  private static TProtocolFactory protocol;
 
-  static Class<? extends TProtocolFactory> getRandomProtocol() {
+  static TProtocolFactory getRandomProtocol() throws InstantiationException, IllegalAccessException {
     List<Class<? extends TProtocolFactory>> protocolFactories = new ArrayList<Class<? extends TProtocolFactory>>();
     protocolFactories.add(org.apache.thrift.protocol.TJSONProtocol.Factory.class);
     protocolFactories.add(org.apache.thrift.protocol.TBinaryProtocol.Factory.class);
     protocolFactories.add(org.apache.thrift.protocol.TTupleProtocol.Factory.class);
     protocolFactories.add(org.apache.thrift.protocol.TCompactProtocol.Factory.class);
 
-    return protocolFactories.get(random.nextInt(protocolFactories.size()));
+    Class<? extends TProtocolFactory> clz = protocolFactories.get(random.nextInt(protocolFactories.size()));
+    return clz.newInstance();
   }
 
   private static final AtomicInteger tableCounter = new AtomicInteger(0);
@@ -195,11 +198,10 @@ public class SimpleProxyIT {
     props.put("zookeepers", accumulo.getZooKeepers());
     props.put("tokenClass", PasswordToken.class.getName());
 
-    protocolClass = getRandomProtocol();
+    protocol = getRandomProtocol();
 
     proxyPort = PortUtils.getRandomFreePort();
-    proxyServer = Proxy.createProxyServer(org.apache.accumulo.proxy.thrift.AccumuloProxy.class, org.apache.accumulo.proxy.ProxyServer.class, proxyPort,
-        protocolClass, props);
+    proxyServer = Proxy.createProxyServer(HostAndPort.fromParts("localhost", proxyPort), protocol, props).server;
     thread = new Thread() {
       @Override
       public void run() {
@@ -209,7 +211,7 @@ public class SimpleProxyIT {
     thread.start();
     while (!proxyServer.isServing())
       UtilWaitThread.sleep(100);
-    client = new TestProxyClient("localhost", proxyPort, protocolClass.newInstance()).proxy();
+    client = new TestProxyClient("localhost", proxyPort, protocol).proxy();
     creds = client.login(principal, properties);
   }
 
@@ -831,7 +833,7 @@ public class SimpleProxyIT {
       public void run() {
         String scanner;
         try {
-          Client client2 = new TestProxyClient("localhost", proxyPort, protocolClass.newInstance()).proxy();
+          Client client2 = new TestProxyClient("localhost", proxyPort, protocol).proxy();
           scanner = client2.createScanner(creds, "slow", null);
           client2.nextK(scanner, 10);
           client2.closeScanner(scanner);
@@ -885,7 +887,7 @@ public class SimpleProxyIT {
       @Override
       public void run() {
         try {
-          Client client2 = new TestProxyClient("localhost", proxyPort, protocolClass.newInstance()).proxy();
+          Client client2 = new TestProxyClient("localhost", proxyPort, protocol).proxy();
           client2.compactTable(creds, "slow", null, null, null, true, true, null);
         } catch (Exception e) {
           throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/test/src/test/java/org/apache/accumulo/proxy/TestProxyInstanceOperations.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/TestProxyInstanceOperations.java b/test/src/test/java/org/apache/accumulo/proxy/TestProxyInstanceOperations.java
index 92d065f..f4d3b64 100644
--- a/test/src/test/java/org/apache/accumulo/proxy/TestProxyInstanceOperations.java
+++ b/test/src/test/java/org/apache/accumulo/proxy/TestProxyInstanceOperations.java
@@ -32,6 +32,8 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.net.HostAndPort;
+
 public class TestProxyInstanceOperations {
   protected static TServer proxy;
   protected static Thread thread;
@@ -46,8 +48,7 @@ public class TestProxyInstanceOperations {
     prop.setProperty("useMockInstance", "true");
     prop.put("tokenClass", PasswordToken.class.getName());
 
-    proxy = Proxy.createProxyServer(Class.forName("org.apache.accumulo.proxy.thrift.AccumuloProxy"), Class.forName("org.apache.accumulo.proxy.ProxyServer"),
-        port, TCompactProtocol.Factory.class, prop);
+    proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server;
     thread = new Thread() {
       @Override
       public void run() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/test/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java b/test/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
index 4528a0e..b005440 100644
--- a/test/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
+++ b/test/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
@@ -48,6 +48,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.net.HostAndPort;
+
 public class TestProxyReadWrite {
   protected static TServer proxy;
   protected static Thread thread;
@@ -63,8 +65,7 @@ public class TestProxyReadWrite {
     prop.setProperty("useMockInstance", "true");
     prop.put("tokenClass", PasswordToken.class.getName());
 
-    proxy = Proxy.createProxyServer(Class.forName("org.apache.accumulo.proxy.thrift.AccumuloProxy"), Class.forName("org.apache.accumulo.proxy.ProxyServer"),
-        port, TCompactProtocol.Factory.class, prop);
+    proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server;
     thread = new Thread() {
       @Override
       public void run() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/test/src/test/java/org/apache/accumulo/proxy/TestProxySecurityOperations.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/TestProxySecurityOperations.java b/test/src/test/java/org/apache/accumulo/proxy/TestProxySecurityOperations.java
index 1a87200..5d3b443 100644
--- a/test/src/test/java/org/apache/accumulo/proxy/TestProxySecurityOperations.java
+++ b/test/src/test/java/org/apache/accumulo/proxy/TestProxySecurityOperations.java
@@ -41,6 +41,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.net.HostAndPort;
+
 public class TestProxySecurityOperations {
   protected static TServer proxy;
   protected static Thread thread;
@@ -57,8 +59,7 @@ public class TestProxySecurityOperations {
     prop.setProperty("useMockInstance", "true");
     prop.put("tokenClass", PasswordToken.class.getName());
 
-    proxy = Proxy.createProxyServer(Class.forName("org.apache.accumulo.proxy.thrift.AccumuloProxy"), Class.forName("org.apache.accumulo.proxy.ProxyServer"),
-        port, TCompactProtocol.Factory.class, prop);
+    proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server;
     thread = new Thread() {
       @Override
       public void run() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98ced20a/test/src/test/java/org/apache/accumulo/proxy/TestProxyTableOperations.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/TestProxyTableOperations.java b/test/src/test/java/org/apache/accumulo/proxy/TestProxyTableOperations.java
index dd01af9..df57ba6 100644
--- a/test/src/test/java/org/apache/accumulo/proxy/TestProxyTableOperations.java
+++ b/test/src/test/java/org/apache/accumulo/proxy/TestProxyTableOperations.java
@@ -43,6 +43,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.net.HostAndPort;
+
 public class TestProxyTableOperations {
 
   protected static TServer proxy;
@@ -59,8 +61,7 @@ public class TestProxyTableOperations {
     prop.setProperty("useMockInstance", "true");
     prop.put("tokenClass", PasswordToken.class.getName());
 
-    proxy = Proxy.createProxyServer(Class.forName("org.apache.accumulo.proxy.thrift.AccumuloProxy"), Class.forName("org.apache.accumulo.proxy.ProxyServer"),
-        port, TCompactProtocol.Factory.class, prop);
+    proxy = Proxy.createProxyServer(HostAndPort.fromParts("localhost", port), new TCompactProtocol.Factory(), prop).server;
     thread = new Thread() {
       @Override
       public void run() {


Mime
View raw message