accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject accumulo git commit: ACCUMULO-3706 Introduce keytab support for replication
Date Tue, 07 Apr 2015 02:40:08 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master b30dc4e81 -> f5b1c0330


ACCUMULO-3706 Introduce keytab support for replication

Need to perform a doAs in the ReplicaSystem to make sure
that we don't step on the tserver's credentials. A keytab
can be provided as authentication for a peer instead of a
password. Includes an IT for automatic verification as well
as manual verification.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f5b1c033
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f5b1c033
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f5b1c033

Branch: refs/heads/master
Commit: f5b1c0330053647898e590845f75a7d5c010735e
Parents: b30dc4e
Author: Josh Elser <elserj@apache.org>
Authored: Mon Apr 6 22:38:55 2015 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Mon Apr 6 22:38:55 2015 -0400

----------------------------------------------------------------------
 .../client/security/tokens/KerberosToken.java   |  13 +-
 .../org/apache/accumulo/core/conf/Property.java |   1 +
 docs/src/main/asciidoc/chapters/replication.txt |  10 +
 .../apache/accumulo/cluster/ClusterUser.java    |   4 +-
 .../accumulo/server/AccumuloServerContext.java  |   2 +-
 .../replication/ReplicaSystemFactory.java       |  43 ++--
 .../server/replication/ReplicaSystemHelper.java |  18 +-
 .../server/replication/ReplicationUtil.java     |  18 +-
 .../server/replication/ReplicationUtilTest.java | 113 +++++++++
 .../apache/accumulo/tserver/TabletServer.java   |  17 +-
 .../replication/AccumuloReplicaSystem.java      | 166 +++++++++++--
 .../replication/ReplicationProcessor.java       |  22 +-
 .../tserver/replication/ReplicationWorker.java  |  11 +-
 .../replication/AccumuloReplicaSystemTest.java  |  31 +++
 .../replication/ReplicationProcessorTest.java   |   8 +-
 .../accumulo/test/randomwalk/Environment.java   |   2 +-
 .../accumulo/harness/AccumuloClusterIT.java     |   2 +-
 .../accumulo/harness/SharedMiniClusterIT.java   |   7 +-
 .../conf/AccumuloMiniClusterConfiguration.java  |   2 +-
 .../StandaloneAccumuloClusterConfiguration.java |   2 +-
 .../test/replication/KerberosReplicationIT.java | 232 +++++++++++++++++++
 21 files changed, 640 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java
index fc4b934..ff353cf 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/KerberosToken.java
@@ -65,13 +65,20 @@ public class KerberosToken implements AuthenticationToken {
    *          The Kerberos principal
    * @param keytab
    *          A keytab file
+   * @param replaceCurrentUser
+   *          Should the current Hadoop user be replaced with this user
    */
-  public KerberosToken(String principal, File keytab) throws IOException {
+  public KerberosToken(String principal, File keytab, boolean replaceCurrentUser) throws IOException {
     Preconditions.checkNotNull(principal, "Principal was null");
     Preconditions.checkNotNull(keytab, "Keytab was null");
     Preconditions.checkArgument(keytab.exists() && keytab.isFile(), "Keytab was not a normal file");
-    UserGroupInformation.loginUserFromKeytab(principal, keytab.getAbsolutePath());
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    UserGroupInformation ugi;
+    if (replaceCurrentUser) {
+      UserGroupInformation.loginUserFromKeytab(principal, keytab.getAbsolutePath());
+      ugi = UserGroupInformation.getCurrentUser();
+    } else {
+      ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab.getAbsolutePath());
+    }
     this.principal = ugi.getUserName();
     this.keytab = keytab;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 2403915..0dac98e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -533,6 +533,7 @@ public enum Property {
   REPLICATION_PEER_USER("replication.peer.user.", null, PropertyType.PREFIX, "The username to provide when authenticating with the given peer"),
   @Sensitive
   REPLICATION_PEER_PASSWORD("replication.peer.password.", null, PropertyType.PREFIX, "The password to provide when authenticating with the given peer"),
+  REPLICATION_PEER_KEYTAB("replication.peer.keytab.", null, PropertyType.PREFIX, "The keytab to use when authenticating with the given peer"),
   REPLICATION_NAME("replication.name", "", PropertyType.STRING,
       "Name of this cluster with respect to replication. Used to identify this instance from other peers"),
   REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "1000", PropertyType.COUNT, "Upper bound of the number of files queued for replication"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/docs/src/main/asciidoc/chapters/replication.txt
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/chapters/replication.txt b/docs/src/main/asciidoc/chapters/replication.txt
index 69bb3c4..ad48cd2 100644
--- a/docs/src/main/asciidoc/chapters/replication.txt
+++ b/docs/src/main/asciidoc/chapters/replication.txt
@@ -102,6 +102,16 @@ root@accumulo_primary> config -s replication.peer.user.peer1=replication
 root@accumulo_primary> config -s replication.peer.password.peer1=password
 ----
 
+Alternatively, when configuring replication on Accumulo running Kerberos, a keytab
+file per peer can be configured instead of a password. The provided keytabs must be readable
+by the unix user running Accumulo. They keytab for a peer can be unique from the
+keytab used by Accumulo or any keytabs for other peers.
+
+----
+accumulo@EXAMPLE.COM@accumulo_primary> config -s replication.peer.user.peer1=replication@EXAMPLE.COM
+accumulo@EXAMPLE.COM@accumulo_primary> config -s replication.peer.keytab.peer1=/path/to/replication.keytab
+----
+
 ==== Table Configuration
 
 Now, we presently have a peer defined, so we just need to configure which tables will

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/minicluster/src/main/java/org/apache/accumulo/cluster/ClusterUser.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/ClusterUser.java b/minicluster/src/main/java/org/apache/accumulo/cluster/ClusterUser.java
index 8f45d7e..f59c47a 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/ClusterUser.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/ClusterUser.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * Simple wrapper around a principal and its credentials: a password or a keytab.
@@ -82,8 +81,7 @@ public class ClusterUser {
     if (null != password) {
       return new PasswordToken(password);
     } else if (null != keytab) {
-      UserGroupInformation.loginUserFromKeytab(principal, keytab.getAbsolutePath());
-      return new KerberosToken(principal, keytab);
+      return new KerberosToken(principal, keytab, true);
     }
 
     throw new IllegalStateException("One of password and keytab must be non-null");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
index 6a59822..315ea59 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
@@ -81,7 +81,7 @@ public class AccumuloServerContext extends ClientContext {
     UserGroupInformation loginUser;
     try {
       // The system user should be logged in via keytab when the process is started, not the currentUser() like KerberosToken
-      loginUser = UserGroupInformation.getCurrentUser();
+      loginUser = UserGroupInformation.getLoginUser();
     } catch (IOException e) {
       throw new RuntimeException("Could not get login user", e);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemFactory.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemFactory.java
index 6d0876b..4764609 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemFactory.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemFactory.java
@@ -16,10 +16,13 @@
  */
 package org.apache.accumulo.server.replication;
 
+import java.util.Map.Entry;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 
 /**
  *
@@ -27,35 +30,25 @@ import com.google.common.base.Preconditions;
 public class ReplicaSystemFactory {
   private static final Logger log = LoggerFactory.getLogger(ReplicaSystemFactory.class);
 
-  private ReplicaSystemFactory() {}
-
   /**
    * @param value
    *          {@link ReplicaSystem} implementation class name
    * @return A {@link ReplicaSystem} object from the given name
    */
-  public static ReplicaSystem get(String value) {
-    Preconditions.checkNotNull(value);
-
-    int index = value.indexOf(',');
-    if (-1 == index) {
-      throw new IllegalArgumentException("Expected comma separator between replication system name and configuration");
-    }
-
-    String name = value.substring(0, index);
-    String configuration = value.substring(index + 1);
+  public ReplicaSystem get(String value) {
+    final Entry<String,String> entry = parseReplicaSystemConfiguration(value);
 
     try {
-      Class<?> clz = Class.forName(name);
+      Class<?> clz = Class.forName(entry.getKey());
 
       if (ReplicaSystem.class.isAssignableFrom(clz)) {
         Object o = clz.newInstance();
         ReplicaSystem rs = (ReplicaSystem) o;
-        rs.configure(configuration);
+        rs.configure(entry.getValue());
         return rs;
       }
 
-      throw new IllegalArgumentException("Class is not assignable to ReplicaSystem: " + name);
+      throw new IllegalArgumentException("Class is not assignable to ReplicaSystem: " + entry.getKey());
     } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
       log.error("Error creating ReplicaSystem object", e);
       throw new IllegalArgumentException(e);
@@ -63,6 +56,26 @@ public class ReplicaSystemFactory {
   }
 
   /**
+   * Parse the configuration value for a peer into its components: {@link ReplicaSystem} class name and configuration string.
+   *
+   * @param value
+   *          The configuration value for a replication peer.
+   * @return An entry where the set is the replica system name and the value is the configuration string.
+   */
+  public Entry<String,String> parseReplicaSystemConfiguration(String value) {
+    Preconditions.checkNotNull(value);
+
+    int index = value.indexOf(',');
+    if (-1 == index) {
+      throw new IllegalArgumentException("Expected comma separator between replication system name and configuration");
+    }
+
+    String name = value.substring(0, index);
+    String configuration = value.substring(index + 1);
+    return Maps.immutableEntry(name, configuration);
+  }
+
+  /**
    * Generate the configuration value for a {@link ReplicaSystem} in the instance properties
    *
    * @param system

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java
index 3d08a3c..50acc65 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java
@@ -20,32 +20,31 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  *
  */
 public class ReplicaSystemHelper {
   private static final Logger log = LoggerFactory.getLogger(ReplicaSystemHelper.class);
 
-  private Instance inst;
-  private Credentials creds;
+  private ClientContext context;
 
-  public ReplicaSystemHelper(Instance inst, Credentials creds) {
-    this.inst = inst;
-    this.creds = creds;
+  public ReplicaSystemHelper(ClientContext context) {
+    Preconditions.checkNotNull(context);
+    this.context = context;
   }
 
   /**
@@ -60,8 +59,7 @@ public class ReplicaSystemHelper {
    */
   public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) throws AccumuloException, AccumuloSecurityException,
       TableNotFoundException {
-    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+    BatchWriter bw = context.getConnector().createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
     try {
       log.debug("Recording new status for {}, {}", filePath.toString(), ProtobufUtil.toString(status));
       Mutation m = new Mutation(filePath.toString());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
index c5661a8..943d211 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
@@ -61,17 +61,18 @@ public class ReplicationUtil {
   private static final Logger log = LoggerFactory.getLogger(ReplicationUtil.class);
   public static final String STATUS_FORMATTER_CLASS_NAME = StatusFormatter.class.getName();
 
-  private ZooCache zooCache;
-
   private final AccumuloServerContext context;
+  private final ZooCache zooCache;
+  private final ReplicaSystemFactory factory;
 
   public ReplicationUtil(AccumuloServerContext context) {
-    this(context, new ZooCache());
+    this(context, new ZooCache(), new ReplicaSystemFactory());
   }
 
-  public ReplicationUtil(AccumuloServerContext context, ZooCache cache) {
+  public ReplicationUtil(AccumuloServerContext context, ZooCache cache, ReplicaSystemFactory factory) {
     this.zooCache = cache;
     this.context = context;
+    this.factory = factory;
   }
 
   public int getMaxReplicationThreads(MasterMonitorInfo mmi) {
@@ -96,17 +97,18 @@ public class ReplicationUtil {
     for (Entry<String,String> property : context.getConfiguration().getAllPropertiesWithPrefix(Property.REPLICATION_PEERS).entrySet()) {
       String key = property.getKey();
       // Filter out cruft that we don't want
-      if (!key.startsWith(Property.REPLICATION_PEER_USER.getKey()) && !key.startsWith(Property.REPLICATION_PEER_PASSWORD.getKey())) {
+      if (!key.startsWith(Property.REPLICATION_PEER_USER.getKey()) && !key.startsWith(Property.REPLICATION_PEER_PASSWORD.getKey())
+          && !key.startsWith(Property.REPLICATION_PEER_KEYTAB.getKey())) {
         String peerName = property.getKey().substring(Property.REPLICATION_PEERS.getKey().length());
-        ReplicaSystem replica;
+        Entry<String,String> entry;
         try {
-          replica = ReplicaSystemFactory.get(property.getValue());
+          entry = factory.parseReplicaSystemConfiguration(property.getValue());
         } catch (Exception e) {
           log.warn("Could not instantiate ReplicaSystem for {} with configuration {}", property.getKey(), property.getValue(), e);
           continue;
         }
 
-        peers.put(peerName, replica.getClass().getName());
+        peers.put(peerName, entry.getKey());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationUtilTest.java
new file mode 100644
index 0000000..c56e88e
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationUtilTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicationUtilTest {
+
+  AccumuloServerContext context;
+  ZooCache zc;
+  AccumuloConfiguration conf;
+  Map<String,String> confEntries;
+  ReplicaSystemFactory factory;
+  ReplicationUtil util;
+
+  @Before
+  public void setup() {
+    context = EasyMock.createMock(AccumuloServerContext.class);
+    zc = EasyMock.createMock(ZooCache.class);
+    conf = EasyMock.createMock(AccumuloConfiguration.class);
+    EasyMock.expect(context.getConfiguration()).andReturn(conf).anyTimes();
+    factory = new ReplicaSystemFactory();
+
+    util = new ReplicationUtil(context, zc, factory);
+    confEntries = new HashMap<>();
+  }
+
+  @Test
+  public void testUserNamePassword() {
+    final String peerName = "peer";
+    final String systemImpl = "my.replica.system.impl";
+    final String config = "accumulo_peer,remote_host:2181";
+    final String peerDefinition = systemImpl + "," + config;
+    confEntries.put(Property.REPLICATION_PEER_USER.getKey() + peerName, "user");
+    confEntries.put(Property.REPLICATION_PEER_PASSWORD.getKey() + peerName, "password");
+    confEntries.put(Property.REPLICATION_PEERS.getKey() + peerName, peerDefinition);
+    ReplicaSystem system = EasyMock.createMock(ReplicaSystem.class);
+
+    // Return out our map of data
+    EasyMock.expect(conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS)).andReturn(confEntries);
+
+    // Switch to replay
+    EasyMock.replay(context, conf, system);
+
+    // Get the peers from our map
+    Map<String,String> peers = util.getPeers();
+
+    // Verify the mocked calls
+    EasyMock.verify(context, conf, system);
+
+    // Assert one peer with expected class name and configuration
+    assertEquals(1, peers.size());
+    Entry<String,String> peer = peers.entrySet().iterator().next();
+    assertEquals(peerName, peer.getKey());
+    assertEquals(systemImpl, peer.getValue());
+  }
+
+  @Test
+  public void testUserNameKeytab() {
+    final String peerName = "peer";
+    final String systemImpl = "my.replica.system.impl";
+    final String config = "accumulo_peer,remote_host:2181";
+    final String peerDefinition = systemImpl + "," + config;
+    confEntries.put(Property.REPLICATION_PEER_USER.getKey() + peerName, "user");
+    confEntries.put(Property.REPLICATION_PEER_KEYTAB.getKey() + peerName, "/path/to/keytab");
+    confEntries.put(Property.REPLICATION_PEERS.getKey() + peerName, peerDefinition);
+    ReplicaSystem system = EasyMock.createMock(ReplicaSystem.class);
+
+    // Return out our map of data
+    EasyMock.expect(conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS)).andReturn(confEntries);
+
+    // Switch to replay
+    EasyMock.replay(context, conf, system);
+
+    // Get the peers from our map
+    Map<String,String> peers = util.getPeers();
+
+    // Verify the mocked calls
+    EasyMock.verify(context, conf, system);
+
+    // Assert one peer with expected class name and configuration
+    assertEquals(1, peers.size());
+    Entry<String,String> peer = peers.entrySet().iterator().next();
+    assertEquals(peerName, peer.getKey());
+    assertEquals(systemImpl, peer.getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 8c4dcf8..e24a49e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -236,6 +237,7 @@ import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.TServiceClient;
@@ -2953,10 +2955,21 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       ServerConfigurationFactory conf = new ServerConfigurationFactory(HdfsZooInstance.getInstance());
       VolumeManager fs = VolumeManagerImpl.get();
       Accumulo.init(fs, conf, app);
-      TabletServer server = new TabletServer(conf, fs);
+      final TabletServer server = new TabletServer(conf, fs);
       server.config(hostname);
       DistributedTrace.enable(hostname, app, conf.getConfiguration());
-      server.run();
+      if (UserGroupInformation.isSecurityEnabled()) {
+        UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+        loginUser.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() {
+            server.run();
+            return null;
+          }
+        });
+      } else {
+        server.run();
+      }
     } catch (Exception ex) {
       log.error("Uncaught exception in TabletServer.main, exiting", ex);
       System.exit(1);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index a03e05d..cb8ae13 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -20,13 +20,16 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -36,6 +39,8 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.ClientExecReturn;
 import org.apache.accumulo.core.client.impl.ReplicationClient;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -68,6 +73,7 @@ import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,8 +160,77 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
   public Status replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
     final Instance localInstance = HdfsZooInstance.getInstance();
     final AccumuloConfiguration localConf = new ServerConfigurationFactory(localInstance).getConfiguration();
-    final ClientContext peerContext = getContextForPeer(localConf, target);
 
+    final String principal = getPrincipal(localConf, target);
+    final File keytab;
+    final String password;
+    if (localConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+      String keytabPath = getKeytab(localConf, target);
+      keytab = new File(keytabPath);
+      if (!keytab.exists() || !keytab.isFile()) {
+        log.error("{} is not a regular file. Cannot login to replicate", keytabPath);
+        return status;
+      }
+      password = null;
+    } else {
+      keytab = null;
+      password = getPassword(localConf, target);
+    }
+
+    if (null != keytab) {
+      try {
+        final UserGroupInformation accumuloUgi = UserGroupInformation.getCurrentUser();
+        // Get a UGI with the principal + keytab
+        UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab.getAbsolutePath());
+
+        // Run inside a doAs to avoid nuking the Tserver's user
+        return ugi.doAs(new PrivilegedAction<Status>() {
+          @Override
+          public Status run() {
+            KerberosToken token;
+            try {
+              // Do *not* replace the current user
+              token = new KerberosToken(principal, keytab, false);
+            } catch (IOException e) {
+              log.error("Failed to create KerberosToken", e);
+              return status;
+            }
+            ClientContext peerContext = getContextForPeer(localConf, target, principal, token);
+            return _replicate(p, status, target, helper, localConf, peerContext, accumuloUgi);
+          }
+        });
+      } catch (IOException e) {
+        // Can't log in, can't replicate
+        log.error("Failed to perform local login", e);
+        return status;
+      }
+    } else {
+      // Simple case: make a password token, context and then replicate
+      PasswordToken token = new PasswordToken(password);
+      ClientContext peerContext = getContextForPeer(localConf, target, principal, token);
+      return _replicate(p, status, target, helper, localConf, peerContext, null);
+    }
+  }
+
+  /**
+   * Perform replication, making a few attempts when an exception is returned.
+   *
+   * @param p
+   *          Path of WAL to replicate
+   * @param status
+   *          Current status for the WAL
+   * @param target
+   *          Where we're replicating to
+   * @param helper
+   *          A helper for replication
+   * @param localConf
+   *          The local instance's configuration
+   * @param peerContext
+   *          The ClientContext to connect to the peer
+   * @return The new (or unchanged) Status for the WAL
+   */
+  private Status _replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper,
+      final AccumuloConfiguration localConf, final ClientContext peerContext, final UserGroupInformation accumuloUgi) {
     try {
       Trace.on("AccumuloReplicaSystem");
 
@@ -166,7 +241,9 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       // trying to replicate it again later some other time.
       int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
       for (int i = 0; i < numAttempts; i++) {
+        log.debug("Attempt {}", i);
         String peerTserverStr;
+        log.debug("Fetching peer tserver address");
         Span span = Trace.start("Fetch peer tserver");
         try {
           // Ask the master on the remote what TServer we should talk with to replicate the data
@@ -208,7 +285,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
           } else {
             span = Trace.start("WAL replication");
             try {
-              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper);
+              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
             } finally {
               span.stop();
             }
@@ -279,9 +356,10 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
   }
 
   protected Status replicateLogs(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status,
-      final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException,
-      AccumuloException, AccumuloSecurityException {
+      final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper, final UserGroupInformation accumuloUgi)
+      throws TTransportException, AccumuloException, AccumuloSecurityException {
 
+    log.info("Replication WAL to peer tserver");
     final Set<Integer> tids;
     final DataInputStream input;
     Span span = Trace.start("Read WAL header");
@@ -315,6 +393,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       span.stop();
     }
 
+    log.info("Skipping unwanted data in WAL");
     span = Trace.start("Consume WAL prefix");
     span.data("file", p.toString());
     try {
@@ -328,7 +407,10 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       span.stop();
     }
 
+    log.info("Sending batches of data to peer tserver");
+
     Status lastStatus = status, currentStatus = status;
+    final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
     while (true) {
       // Set some trace info
       span = Trace.start("Replicate WAL batch");
@@ -364,7 +446,34 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       if (!currentStatus.equals(lastStatus)) {
         span = Trace.start("Update replication table");
         try {
-          helper.recordNewStatus(p, currentStatus, target);
+          if (null != accumuloUgi) {
+            final Status copy = currentStatus;
+            accumuloUgi.doAs(new PrivilegedAction<Void>() {
+              @Override
+              public Void run() {
+                try {
+                  helper.recordNewStatus(p, copy, target);
+                } catch (Exception e) {
+                  exceptionRef.set(e);
+                }
+                return null;
+              }
+            });
+            Exception e = exceptionRef.get();
+            if (null != e) {
+              if (e instanceof TableNotFoundException) {
+                throw (TableNotFoundException) e;
+              } else if (e instanceof AccumuloSecurityException) {
+                throw (AccumuloSecurityException) e;
+              } else if (e instanceof AccumuloException) {
+                throw (AccumuloException) e;
+              } else {
+                throw new RuntimeException("Received unexpected exception", e);
+              }
+            }
+          } else {
+            helper.recordNewStatus(p, currentStatus, target);
+          }
         } catch (TableNotFoundException e) {
           log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
           throw new RuntimeException("Replication table did not exist, will retry", e);
@@ -421,6 +530,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
       // If we have some edits to send
       if (0 < edits.walEdits.getEditsSize()) {
+        log.info("Sending {} edits", edits.walEdits.getEditsSize());
         long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
         if (entriesReplicated != edits.numUpdates) {
           log.warn("Sent {} WAL entries for replication but {} were reported as replicated", edits.numUpdates, entriesReplicated);
@@ -479,25 +589,55 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     }
   }
 
-  private ClientContext getContextForPeer(AccumuloConfiguration localConf, ReplicationTarget target) {
+  protected String getPassword(AccumuloConfiguration localConf, ReplicationTarget target) {
+    Preconditions.checkNotNull(localConf);
+    Preconditions.checkNotNull(target);
+
+    Map<String,String> peerPasswords = localConf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_PASSWORD);
+    String password = peerPasswords.get(Property.REPLICATION_PEER_PASSWORD.getKey() + target.getPeerName());
+    if (null == password) {
+      throw new IllegalArgumentException("Cannot get password for " + target.getPeerName());
+    }
+    return password;
+  }
+
+  protected String getKeytab(AccumuloConfiguration localConf, ReplicationTarget target) {
+    Preconditions.checkNotNull(localConf);
+    Preconditions.checkNotNull(target);
+
+    Map<String,String> peerKeytabs = localConf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_KEYTAB);
+    String keytab = peerKeytabs.get(Property.REPLICATION_PEER_KEYTAB.getKey() + target.getPeerName());
+    if (null == keytab) {
+      throw new IllegalArgumentException("Cannot get keytab for " + target.getPeerName());
+    }
+    return keytab;
+  }
+
+  protected String getPrincipal(AccumuloConfiguration localConf, ReplicationTarget target) {
     Preconditions.checkNotNull(localConf);
     Preconditions.checkNotNull(target);
 
     String peerName = target.getPeerName();
-    String userKey = Property.REPLICATION_PEER_USER.getKey() + peerName, passwordKey = Property.REPLICATION_PEER_PASSWORD.getKey() + peerName;
+    String userKey = Property.REPLICATION_PEER_USER.getKey() + peerName;
     Map<String,String> peerUsers = localConf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_USER);
-    Map<String,String> peerPasswords = localConf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_PASSWORD);
 
     String user = peerUsers.get(userKey);
-    String password = peerPasswords.get(passwordKey);
-    if (null == user || null == password) {
-      throw new IllegalArgumentException(userKey + " and " + passwordKey + " not configured, cannot replicate");
+    if (null == user) {
+      throw new IllegalArgumentException("Cannot get user for " + target.getPeerName());
     }
+    return user;
+  }
+
+  protected ClientContext getContextForPeer(AccumuloConfiguration localConf, ReplicationTarget target, String principal, AuthenticationToken token) {
+    Preconditions.checkNotNull(localConf);
+    Preconditions.checkNotNull(target);
+    Preconditions.checkNotNull(principal);
+    Preconditions.checkNotNull(token);
 
-    return new ClientContext(getPeerInstance(target), new Credentials(user, new PasswordToken(password)), localConf);
+    return new ClientContext(getPeerInstance(target), new Credentials(principal, token), localConf);
   }
 
-  private Instance getPeerInstance(ReplicationTarget target) {
+  protected Instance getPeerInstance(ReplicationTarget target) {
     return new ZooKeeperInstance(instanceName, zookeepers);
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 8e39d1c..8b825f8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -24,8 +24,8 @@ import java.util.NoSuchElementException;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Range;
@@ -34,8 +34,6 @@ import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
 import org.apache.accumulo.server.replication.ReplicaSystem;
@@ -57,23 +55,23 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class ReplicationProcessor implements Processor {
   private static final Logger log = LoggerFactory.getLogger(ReplicationProcessor.class);
 
-  private final Instance inst;
+  private final ClientContext context;
   private final AccumuloConfiguration conf;
   private final VolumeManager fs;
-  private final Credentials creds;
   private final ReplicaSystemHelper helper;
+  private final ReplicaSystemFactory factory;
 
-  public ReplicationProcessor(Instance inst, AccumuloConfiguration conf, VolumeManager fs, Credentials creds) {
-    this.inst = inst;
+  public ReplicationProcessor(ClientContext context, AccumuloConfiguration conf, VolumeManager fs) {
+    this.context = context;
     this.conf = conf;
     this.fs = fs;
-    this.creds = creds;
-    this.helper = new ReplicaSystemHelper(inst, creds);
+    this.helper = new ReplicaSystemHelper(context);
+    this.factory = new ReplicaSystemFactory();
   }
 
   @Override
   public ReplicationProcessor newProcessor() {
-    return new ReplicationProcessor(inst, new ServerConfigurationFactory(inst).getConfiguration(), fs, creds);
+    return new ReplicationProcessor(context, context.getConfiguration(), fs);
   }
 
   @Override
@@ -146,7 +144,7 @@ public class ReplicationProcessor implements Processor {
     String peerType = getPeerType(target.getPeerName());
 
     // Get the peer that we're replicating to
-    return ReplicaSystemFactory.get(peerType);
+    return factory.get(peerType);
   }
 
   protected String getPeerType(String peerName) {
@@ -173,7 +171,7 @@ public class ReplicationProcessor implements Processor {
 
   protected Status getStatus(String file, ReplicationTarget target) throws ReplicationTableOfflineException, AccumuloException, AccumuloSecurityException,
       InvalidProtocolBufferException {
-    Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken()));
+    Scanner s = ReplicationTable.getScanner(context.getConnector());
     s.setRange(Range.exact(file));
     s.fetchColumn(WorkSection.NAME, target.toText());
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index de99029..5e40ff6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -24,7 +24,6 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.replication.ReplicationConstants;
-import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
@@ -33,22 +32,22 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- *
+ * Launches the {@link ReplicationProcessor}
  */
 public class ReplicationWorker implements Runnable {
   private static final Logger log = LoggerFactory.getLogger(ReplicationWorker.class);
 
+  private ClientContext context;
   private Instance inst;
-  private VolumeManager fs;
-  private Credentials creds;
   private AccumuloConfiguration conf;
+  private VolumeManager fs;
   private ThreadPoolExecutor executor;
 
   public ReplicationWorker(ClientContext clientCtx, VolumeManager fs) {
+    this.context = clientCtx;
     this.inst = clientCtx.getInstance();
     this.fs = fs;
     this.conf = clientCtx.getConfiguration();
-    this.creds = clientCtx.getCredentials();
   }
 
   public void setExecutor(ThreadPoolExecutor executor) {
@@ -72,7 +71,7 @@ public class ReplicationWorker implements Runnable {
         workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_WORK_QUEUE, conf);
       }
 
-      workQueue.startProcessing(new ReplicationProcessor(inst, conf, fs, creds), executor);
+      workQueue.startProcessing(new ReplicationProcessor(context, conf, fs), executor);
     } catch (KeeperException | InterruptedException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
index 103fd81..e504dc0 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
@@ -20,6 +20,7 @@ import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -494,4 +495,34 @@ public class AccumuloReplicaSystemTest {
 
     Assert.assertEquals(new ReplicationStats(0l, 0l, 5l), stats);
   }
+
+  @Test
+  public void testUserPassword() throws Exception {
+    AccumuloReplicaSystem ars = new AccumuloReplicaSystem();
+    ReplicationTarget target = new ReplicationTarget("peer", "peer_table", "1");
+    String user = "user", password = "password";
+
+    Map<String,String> confMap = new HashMap<>();
+    confMap.put(Property.REPLICATION_PEER_USER.getKey() + target.getPeerName(), user);
+    confMap.put(Property.REPLICATION_PEER_PASSWORD.getKey() + target.getPeerName(), password);
+    AccumuloConfiguration conf = new ConfigurationCopy(confMap);
+
+    assertEquals(user, ars.getPrincipal(conf, target));
+    assertEquals(password, ars.getPassword(conf, target));
+  }
+
+  @Test
+  public void testUserKeytab() throws Exception {
+    AccumuloReplicaSystem ars = new AccumuloReplicaSystem();
+    ReplicationTarget target = new ReplicationTarget("peer", "peer_table", "1");
+    String user = "user", keytab = "/etc/security/keytabs/replication.keytab";
+
+    Map<String,String> confMap = new HashMap<>();
+    confMap.put(Property.REPLICATION_PEER_USER.getKey() + target.getPeerName(), user);
+    confMap.put(Property.REPLICATION_PEER_KEYTAB.getKey() + target.getPeerName(), keytab);
+    AccumuloConfiguration conf = new ConfigurationCopy(confMap);
+
+    assertEquals(user, ars.getPrincipal(conf, target));
+    assertEquals(keytab, ars.getKeytab(conf, target));
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
index 838ba1a..0c444b9 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
@@ -21,7 +21,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
@@ -44,6 +46,7 @@ public class ReplicationProcessorTest {
     Instance inst = EasyMock.createMock(Instance.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
     Credentials creds = new Credentials("foo", new PasswordToken("bar"));
+    ClientContext context = new ClientContext(inst, creds, new ClientConfiguration());
 
     Map<String,String> data = new HashMap<>();
 
@@ -52,7 +55,7 @@ public class ReplicationProcessorTest {
     data.put(Property.REPLICATION_PEERS + peerName, configuration);
     ConfigurationCopy conf = new ConfigurationCopy(data);
 
-    ReplicationProcessor proc = new ReplicationProcessor(inst, conf, fs, creds);
+    ReplicationProcessor proc = new ReplicationProcessor(context, conf, fs);
 
     Assert.assertEquals(configuration, proc.getPeerType(peerName));
   }
@@ -62,11 +65,12 @@ public class ReplicationProcessorTest {
     Instance inst = EasyMock.createMock(Instance.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
     Credentials creds = new Credentials("foo", new PasswordToken("bar"));
+    ClientContext context = new ClientContext(inst, creds, new ClientConfiguration());
 
     Map<String,String> data = new HashMap<>();
     ConfigurationCopy conf = new ConfigurationCopy(data);
 
-    ReplicationProcessor proc = new ReplicationProcessor(inst, conf, fs, creds);
+    ReplicationProcessor proc = new ReplicationProcessor(context, conf, fs);
 
     proc.getPeerType("foo");
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/test/src/main/java/org/apache/accumulo/test/randomwalk/Environment.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/Environment.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/Environment.java
index 992c08a..616ec63 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Environment.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Environment.java
@@ -169,7 +169,7 @@ public class Environment {
         throw new IllegalArgumentException("Provided keytab is not a normal file: "+ keytab);
       }
       try {
-        return new KerberosToken(getUserName(), keytabFile);
+        return new KerberosToken(getUserName(), keytabFile, true);
       } catch (IOException e) {
         throw new RuntimeException("Failed to login", e);
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
index 250e01e..df0ddf0 100644
--- a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
+++ b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
@@ -169,7 +169,7 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniCluste
           UserGroupInformation.loginUserFromKeytab(systemUser.getPrincipal(), systemUser.getKeytab().getAbsolutePath());
 
           // Open a connector as the system user (ensures the user will exist for us to assign permissions to)
-          Connector conn = cluster.getConnector(systemUser.getPrincipal(), new KerberosToken(systemUser.getPrincipal(), systemUser.getKeytab()));
+          Connector conn = cluster.getConnector(systemUser.getPrincipal(), new KerberosToken(systemUser.getPrincipal(), systemUser.getKeytab(), true));
 
           // Then, log back in as the "root" user and do the grant
           UserGroupInformation.loginUserFromKeytab(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/test/src/test/java/org/apache/accumulo/harness/SharedMiniClusterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/SharedMiniClusterIT.java b/test/src/test/java/org/apache/accumulo/harness/SharedMiniClusterIT.java
index b47e604..98ebddc 100644
--- a/test/src/test/java/org/apache/accumulo/harness/SharedMiniClusterIT.java
+++ b/test/src/test/java/org/apache/accumulo/harness/SharedMiniClusterIT.java
@@ -73,10 +73,9 @@ public abstract class SharedMiniClusterIT extends AccumuloIT implements ClusterU
       UserGroupInformation.setConfiguration(conf);
       // Login as the client
       ClusterUser rootUser = krb.getRootUser();
-      UserGroupInformation.loginUserFromKeytab(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath());
       // Get the krb token
       principal = rootUser.getPrincipal();
-      token = new KerberosToken(principal, rootUser.getKeytab());
+      token = new KerberosToken(principal, rootUser.getKeytab(), true);
     } else {
       rootPassword = "rootPasswordShared1";
       token = new PasswordToken(rootPassword);
@@ -89,10 +88,8 @@ public abstract class SharedMiniClusterIT extends AccumuloIT implements ClusterU
       final String traceTable = Property.TRACE_TABLE.getDefaultValue();
       final ClusterUser systemUser = krb.getAccumuloServerUser(), rootUser = krb.getRootUser();
       // Login as the trace user
-      UserGroupInformation.loginUserFromKeytab(systemUser.getPrincipal(), systemUser.getKeytab().getAbsolutePath());
-
       // Open a connector as the system user (ensures the user will exist for us to assign permissions to)
-      Connector conn = cluster.getConnector(systemUser.getPrincipal(), new KerberosToken(systemUser.getPrincipal(), systemUser.getKeytab()));
+      Connector conn = cluster.getConnector(systemUser.getPrincipal(), new KerberosToken(systemUser.getPrincipal(), systemUser.getKeytab(), true));
 
       // Then, log back in as the "root" user and do the grant
       UserGroupInformation.loginUserFromKeytab(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/test/src/test/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java b/test/src/test/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java
index a1295dc..cb43556 100644
--- a/test/src/test/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java
+++ b/test/src/test/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java
@@ -86,7 +86,7 @@ public class AccumuloMiniClusterConfiguration extends AccumuloClusterPropertyCon
 
       ClusterUser rootUser = AccumuloClusterIT.getKdc().getRootUser();
       try {
-        return new KerberosToken(rootUser.getPrincipal(), rootUser.getKeytab());
+        return new KerberosToken(rootUser.getPrincipal(), rootUser.getKeytab(), true);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/test/src/test/java/org/apache/accumulo/harness/conf/StandaloneAccumuloClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/conf/StandaloneAccumuloClusterConfiguration.java b/test/src/test/java/org/apache/accumulo/harness/conf/StandaloneAccumuloClusterConfiguration.java
index aed0a21..22fe103 100644
--- a/test/src/test/java/org/apache/accumulo/harness/conf/StandaloneAccumuloClusterConfiguration.java
+++ b/test/src/test/java/org/apache/accumulo/harness/conf/StandaloneAccumuloClusterConfiguration.java
@@ -163,7 +163,7 @@ public class StandaloneAccumuloClusterConfiguration extends AccumuloClusterPrope
     if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
       File keytab = getAdminKeytab();
       try {
-        return new KerberosToken(getAdminPrincipal(), keytab);
+        return new KerberosToken(getAdminPrincipal(), keytab, true);
       } catch (IOException e) {
         // The user isn't logged in
         throw new RuntimeException("Failed to create KerberosToken", e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5b1c033/test/src/test/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java
new file mode 100644
index 0000000..382cda0
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.cluster.ClusterUser;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.harness.AccumuloIT;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.MiniClusterHarness;
+import org.apache.accumulo.harness.TestingKdc;
+import org.apache.accumulo.master.replication.SequentialWorkAssigner;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.replication.ReplicaSystemFactory;
+import org.apache.accumulo.test.functional.KerberosIT;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ensure that replication occurs using keytabs instead of password (not to mention SASL)
+ */
+public class KerberosReplicationIT extends AccumuloIT {
+  private static final Logger log = LoggerFactory.getLogger(KerberosIT.class);
+
+  private static TestingKdc kdc;
+  private static String krbEnabledForITs = null;
+  private static ClusterUser rootUser;
+
+  @BeforeClass
+  public static void startKdc() throws Exception {
+    kdc = new TestingKdc();
+    kdc.start();
+    krbEnabledForITs = System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION);
+    if (null == krbEnabledForITs || !Boolean.parseBoolean(krbEnabledForITs)) {
+      System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, "true");
+    }
+    rootUser = kdc.getRootUser();
+  }
+
+  @AfterClass
+  public static void stopKdc() throws Exception {
+    if (null != kdc) {
+      kdc.stop();
+    }
+    if (null != krbEnabledForITs) {
+      System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, krbEnabledForITs);
+    }
+  }
+
+  private MiniAccumuloClusterImpl primary, peer;
+  private String PRIMARY_NAME = "primary", PEER_NAME = "peer";
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60 * 3;
+  }
+
+  private MiniClusterConfigurationCallback getConfigCallback(final String name) {
+    return new MiniClusterConfigurationCallback() {
+      @Override
+      public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+        cfg.setNumTservers(1);
+        cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+        cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
+        cfg.setProperty(Property.GC_CYCLE_START, "1s");
+        cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
+        cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+        cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+        cfg.setProperty(Property.REPLICATION_NAME, name);
+        cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
+        cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+        cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
+        coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+      }
+    };
+  }
+
+  @Before
+  public void setup() throws Exception {
+    MiniClusterHarness harness = new MiniClusterHarness();
+
+    // Create a primary and a peer instance, both with the same "root" user
+    primary = harness.create(getClass().getName(), testName.getMethodName(), new PasswordToken("unused"), getConfigCallback(PRIMARY_NAME), kdc);
+    primary.start();
+
+    peer = harness.create(getClass().getName(), testName.getMethodName() + "_peer", new PasswordToken("unused"), getConfigCallback(PEER_NAME), kdc);
+    peer.start();
+
+    // Enable kerberos auth
+    Configuration conf = new Configuration(false);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (null != peer) {
+      peer.stop();
+    }
+    if (null != primary) {
+      primary.stop();
+    }
+  }
+
+  @Test
+  public void dataReplicatedToCorrectTable() throws Exception {
+    // Login as the root user
+    UserGroupInformation.loginUserFromKeytab(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath());
+
+    final KerberosToken token = new KerberosToken();
+    final Connector primaryConn = primary.getConnector(rootUser.getPrincipal(), token);
+    final Connector peerConn = peer.getConnector(rootUser.getPrincipal(), token);
+
+    ClusterUser replicationUser = kdc.getClientPrincipal(0);
+
+    // Create user for replication to the peer
+    peerConn.securityOperations().createLocalUser(replicationUser.getPrincipal(), null);
+
+    primaryConn.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + PEER_NAME, replicationUser.getPrincipal());
+    primaryConn.instanceOperations().setProperty(Property.REPLICATION_PEER_KEYTAB.getKey() + PEER_NAME, replicationUser.getKeytab().getAbsolutePath());
+
+    // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+    primaryConn.instanceOperations().setProperty(
+        Property.REPLICATION_PEERS.getKey() + PEER_NAME,
+        ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+            AccumuloReplicaSystem.buildConfiguration(peerConn.getInstance().getInstanceName(), peerConn.getInstance().getZooKeepers())));
+
+    String primaryTable1 = "primary", peerTable1 = "peer";
+
+    // Create tables
+    primaryConn.tableOperations().create(primaryTable1);
+    String masterTableId1 = primaryConn.tableOperations().tableIdMap().get(primaryTable1);
+    Assert.assertNotNull(masterTableId1);
+
+    peerConn.tableOperations().create(peerTable1);
+    String peerTableId1 = peerConn.tableOperations().tableIdMap().get(peerTable1);
+    Assert.assertNotNull(peerTableId1);
+
+    // Grant write permission
+    peerConn.securityOperations().grantTablePermission(replicationUser.getPrincipal(), peerTable1, TablePermission.WRITE);
+
+    // Replicate this table to the peerClusterName in a table with the peerTableId table id
+    primaryConn.tableOperations().setProperty(primaryTable1, Property.TABLE_REPLICATION.getKey(), "true");
+    primaryConn.tableOperations().setProperty(primaryTable1, Property.TABLE_REPLICATION_TARGET.getKey() + PEER_NAME, peerTableId1);
+
+    // Write some data to table1
+    BatchWriter bw = primaryConn.createBatchWriter(primaryTable1, new BatchWriterConfig());
+    long masterTable1Records = 0l;
+    for (int rows = 0; rows < 2500; rows++) {
+      Mutation m = new Mutation(primaryTable1 + rows);
+      for (int cols = 0; cols < 100; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+        masterTable1Records++;
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    log.info("Wrote all data to primary cluster");
+
+    Set<String> filesFor1 = primaryConn.replicationOperations().referencedFiles(primaryTable1);
+
+    // Restart the tserver to force a close on the WAL
+    for (ProcessReference proc : primary.getProcesses().get(ServerType.TABLET_SERVER)) {
+      primary.killProcess(ServerType.TABLET_SERVER, proc);
+    }
+    primary.exec(TabletServer.class);
+
+    log.info("Restarted the tserver");
+
+    // Read the data -- the tserver is back up and running and tablets are assigned
+    for (@SuppressWarnings("unused")
+    Entry<Key,Value> entry : primaryConn.createScanner(primaryTable1, Authorizations.EMPTY)) {}
+
+    // Wait for both tables to be replicated
+    log.info("Waiting for {} for {}", filesFor1, primaryTable1);
+    primaryConn.replicationOperations().drain(primaryTable1, filesFor1);
+
+    long countTable = 0l;
+    for (Entry<Key,Value> entry : peerConn.createScanner(peerTable1, Authorizations.EMPTY)) {
+      countTable++;
+      Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+          .startsWith(primaryTable1));
+    }
+
+    log.info("Found {} records in {}", countTable, peerTable1);
+    Assert.assertEquals(masterTable1Records, countTable);
+  }
+}


Mime
View raw message