accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/6] ACCUMULO-2587 First addition of authentication between replication service and client
Date Tue, 27 May 2014 04:04:17 GMT
Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-378 2425fd24b -> 9d9b5ed24


http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java
index e5e26ca..e297445 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java
@@ -50,17 +50,17 @@ import org.slf4j.LoggerFactory;
 
   public interface Iface {
 
-    public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, org.apache.thrift.TException;
+    public long replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException;
 
-    public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, org.apache.thrift.TException;
+    public long replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException;
 
   }
 
   public interface AsyncIface {
 
-    public void replicateLog(int remoteTableId, WalEdits data, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateLog_call> resultHandler) throws org.apache.thrift.TException;
+    public void replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateLog_call> resultHandler) throws org.apache.thrift.TException;
 
-    public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException;
+    public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException;
 
   }
 
@@ -84,17 +84,18 @@ import org.slf4j.LoggerFactory;
       super(iprot, oprot);
     }
 
-    public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, org.apache.thrift.TException
+    public long replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException
     {
-      send_replicateLog(remoteTableId, data);
+      send_replicateLog(remoteTableId, data, credentials);
       return recv_replicateLog();
     }
 
-    public void send_replicateLog(int remoteTableId, WalEdits data) throws org.apache.thrift.TException
+    public void send_replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.thrift.TException
     {
       replicateLog_args args = new replicateLog_args();
       args.setRemoteTableId(remoteTableId);
       args.setData(data);
+      args.setCredentials(credentials);
       sendBase("replicateLog", args);
     }
 
@@ -111,17 +112,18 @@ import org.slf4j.LoggerFactory;
       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "replicateLog failed: unknown result");
     }
 
-    public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, org.apache.thrift.TException
+    public long replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException
     {
-      send_replicateKeyValues(remoteTableId, data);
+      send_replicateKeyValues(remoteTableId, data, credentials);
       return recv_replicateKeyValues();
     }
 
-    public void send_replicateKeyValues(int remoteTableId, KeyValues data) throws org.apache.thrift.TException
+    public void send_replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.thrift.TException
     {
       replicateKeyValues_args args = new replicateKeyValues_args();
       args.setRemoteTableId(remoteTableId);
       args.setData(data);
+      args.setCredentials(credentials);
       sendBase("replicateKeyValues", args);
     }
 
@@ -156,9 +158,9 @@ import org.slf4j.LoggerFactory;
       super(protocolFactory, clientManager, transport);
     }
 
-    public void replicateLog(int remoteTableId, WalEdits data, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler) throws org.apache.thrift.TException {
+    public void replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler) throws org.apache.thrift.TException {
       checkReady();
-      replicateLog_call method_call = new replicateLog_call(remoteTableId, data, resultHandler, this, ___protocolFactory, ___transport);
+      replicateLog_call method_call = new replicateLog_call(remoteTableId, data, credentials, resultHandler, this, ___protocolFactory, ___transport);
       this.___currentMethod = method_call;
       ___manager.call(method_call);
     }
@@ -166,10 +168,12 @@ import org.slf4j.LoggerFactory;
     public static class replicateLog_call extends org.apache.thrift.async.TAsyncMethodCall {
       private int remoteTableId;
       private WalEdits data;
-      public replicateLog_call(int remoteTableId, WalEdits data, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+      private org.apache.accumulo.core.security.thrift.TCredentials credentials;
+      public replicateLog_call(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
         super(client, protocolFactory, transport, resultHandler, false);
         this.remoteTableId = remoteTableId;
         this.data = data;
+        this.credentials = credentials;
       }
 
       public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
@@ -177,6 +181,7 @@ import org.slf4j.LoggerFactory;
         replicateLog_args args = new replicateLog_args();
         args.setRemoteTableId(remoteTableId);
         args.setData(data);
+        args.setCredentials(credentials);
         args.write(prot);
         prot.writeMessageEnd();
       }
@@ -191,9 +196,9 @@ import org.slf4j.LoggerFactory;
       }
     }
 
-    public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException {
+    public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException {
       checkReady();
-      replicateKeyValues_call method_call = new replicateKeyValues_call(remoteTableId, data, resultHandler, this, ___protocolFactory, ___transport);
+      replicateKeyValues_call method_call = new replicateKeyValues_call(remoteTableId, data, credentials, resultHandler, this, ___protocolFactory, ___transport);
       this.___currentMethod = method_call;
       ___manager.call(method_call);
     }
@@ -201,10 +206,12 @@ import org.slf4j.LoggerFactory;
     public static class replicateKeyValues_call extends org.apache.thrift.async.TAsyncMethodCall {
       private int remoteTableId;
       private KeyValues data;
-      public replicateKeyValues_call(int remoteTableId, KeyValues data, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+      private org.apache.accumulo.core.security.thrift.TCredentials credentials;
+      public replicateKeyValues_call(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
         super(client, protocolFactory, transport, resultHandler, false);
         this.remoteTableId = remoteTableId;
         this.data = data;
+        this.credentials = credentials;
       }
 
       public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
@@ -212,6 +219,7 @@ import org.slf4j.LoggerFactory;
         replicateKeyValues_args args = new replicateKeyValues_args();
         args.setRemoteTableId(remoteTableId);
         args.setData(data);
+        args.setCredentials(credentials);
         args.write(prot);
         prot.writeMessageEnd();
       }
@@ -260,7 +268,7 @@ import org.slf4j.LoggerFactory;
       public replicateLog_result getResult(I iface, replicateLog_args args) throws org.apache.thrift.TException {
         replicateLog_result result = new replicateLog_result();
         try {
-          result.success = iface.replicateLog(args.remoteTableId, args.data);
+          result.success = iface.replicateLog(args.remoteTableId, args.data, args.credentials);
           result.setSuccessIsSet(true);
         } catch (RemoteReplicationException e) {
           result.e = e;
@@ -285,7 +293,7 @@ import org.slf4j.LoggerFactory;
       public replicateKeyValues_result getResult(I iface, replicateKeyValues_args args) throws org.apache.thrift.TException {
         replicateKeyValues_result result = new replicateKeyValues_result();
         try {
-          result.success = iface.replicateKeyValues(args.remoteTableId, args.data);
+          result.success = iface.replicateKeyValues(args.remoteTableId, args.data, args.credentials);
           result.setSuccessIsSet(true);
         } catch (RemoteReplicationException e) {
           result.e = e;
@@ -301,6 +309,7 @@ import org.slf4j.LoggerFactory;
 
     private static final org.apache.thrift.protocol.TField REMOTE_TABLE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("remoteTableId", org.apache.thrift.protocol.TType.I32, (short)1);
     private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+    private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)3);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
@@ -310,11 +319,13 @@ import org.slf4j.LoggerFactory;
 
     public int remoteTableId; // required
     public WalEdits data; // required
+    public org.apache.accumulo.core.security.thrift.TCredentials credentials; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
       REMOTE_TABLE_ID((short)1, "remoteTableId"),
-      DATA((short)2, "data");
+      DATA((short)2, "data"),
+      CREDENTIALS((short)3, "credentials");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -333,6 +344,8 @@ import org.slf4j.LoggerFactory;
             return REMOTE_TABLE_ID;
           case 2: // DATA
             return DATA;
+          case 3: // CREDENTIALS
+            return CREDENTIALS;
           default:
             return null;
         }
@@ -382,6 +395,8 @@ import org.slf4j.LoggerFactory;
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
       tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WalEdits.class)));
+      tmpMap.put(_Fields.CREDENTIALS, new org.apache.thrift.meta_data.FieldMetaData("credentials", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.security.thrift.TCredentials.class)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
       org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(replicateLog_args.class, metaDataMap);
     }
@@ -391,12 +406,14 @@ import org.slf4j.LoggerFactory;
 
     public replicateLog_args(
       int remoteTableId,
-      WalEdits data)
+      WalEdits data,
+      org.apache.accumulo.core.security.thrift.TCredentials credentials)
     {
       this();
       this.remoteTableId = remoteTableId;
       setRemoteTableIdIsSet(true);
       this.data = data;
+      this.credentials = credentials;
     }
 
     /**
@@ -408,6 +425,9 @@ import org.slf4j.LoggerFactory;
       if (other.isSetData()) {
         this.data = new WalEdits(other.data);
       }
+      if (other.isSetCredentials()) {
+        this.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(other.credentials);
+      }
     }
 
     public replicateLog_args deepCopy() {
@@ -419,6 +439,7 @@ import org.slf4j.LoggerFactory;
       setRemoteTableIdIsSet(false);
       this.remoteTableId = 0;
       this.data = null;
+      this.credentials = null;
     }
 
     public int getRemoteTableId() {
@@ -468,6 +489,30 @@ import org.slf4j.LoggerFactory;
       }
     }
 
+    public org.apache.accumulo.core.security.thrift.TCredentials getCredentials() {
+      return this.credentials;
+    }
+
+    public replicateLog_args setCredentials(org.apache.accumulo.core.security.thrift.TCredentials credentials) {
+      this.credentials = credentials;
+      return this;
+    }
+
+    public void unsetCredentials() {
+      this.credentials = null;
+    }
+
+    /** Returns true if field credentials is set (has been assigned a value) and false otherwise */
+    public boolean isSetCredentials() {
+      return this.credentials != null;
+    }
+
+    public void setCredentialsIsSet(boolean value) {
+      if (!value) {
+        this.credentials = null;
+      }
+    }
+
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
       case REMOTE_TABLE_ID:
@@ -486,6 +531,14 @@ import org.slf4j.LoggerFactory;
         }
         break;
 
+      case CREDENTIALS:
+        if (value == null) {
+          unsetCredentials();
+        } else {
+          setCredentials((org.apache.accumulo.core.security.thrift.TCredentials)value);
+        }
+        break;
+
       }
     }
 
@@ -497,6 +550,9 @@ import org.slf4j.LoggerFactory;
       case DATA:
         return getData();
 
+      case CREDENTIALS:
+        return getCredentials();
+
       }
       throw new IllegalStateException();
     }
@@ -512,6 +568,8 @@ import org.slf4j.LoggerFactory;
         return isSetRemoteTableId();
       case DATA:
         return isSetData();
+      case CREDENTIALS:
+        return isSetCredentials();
       }
       throw new IllegalStateException();
     }
@@ -547,6 +605,15 @@ import org.slf4j.LoggerFactory;
           return false;
       }
 
+      boolean this_present_credentials = true && this.isSetCredentials();
+      boolean that_present_credentials = true && that.isSetCredentials();
+      if (this_present_credentials || that_present_credentials) {
+        if (!(this_present_credentials && that_present_credentials))
+          return false;
+        if (!this.credentials.equals(that.credentials))
+          return false;
+      }
+
       return true;
     }
 
@@ -583,6 +650,16 @@ import org.slf4j.LoggerFactory;
           return lastComparison;
         }
       }
+      lastComparison = Boolean.valueOf(isSetCredentials()).compareTo(typedOther.isSetCredentials());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetCredentials()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentials, typedOther.credentials);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -614,6 +691,14 @@ import org.slf4j.LoggerFactory;
         sb.append(this.data);
       }
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("credentials:");
+      if (this.credentials == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.credentials);
+      }
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -624,6 +709,9 @@ import org.slf4j.LoggerFactory;
       if (data != null) {
         data.validate();
       }
+      if (credentials != null) {
+        credentials.validate();
+      }
     }
 
     private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -679,6 +767,15 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
+            case 3: // CREDENTIALS
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+                struct.credentials.read(iprot);
+                struct.setCredentialsIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
           }
@@ -702,6 +799,11 @@ import org.slf4j.LoggerFactory;
           struct.data.write(oprot);
           oprot.writeFieldEnd();
         }
+        if (struct.credentials != null) {
+          oprot.writeFieldBegin(CREDENTIALS_FIELD_DESC);
+          struct.credentials.write(oprot);
+          oprot.writeFieldEnd();
+        }
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
@@ -726,19 +828,25 @@ import org.slf4j.LoggerFactory;
         if (struct.isSetData()) {
           optionals.set(1);
         }
-        oprot.writeBitSet(optionals, 2);
+        if (struct.isSetCredentials()) {
+          optionals.set(2);
+        }
+        oprot.writeBitSet(optionals, 3);
         if (struct.isSetRemoteTableId()) {
           oprot.writeI32(struct.remoteTableId);
         }
         if (struct.isSetData()) {
           struct.data.write(oprot);
         }
+        if (struct.isSetCredentials()) {
+          struct.credentials.write(oprot);
+        }
       }
 
       @Override
       public void read(org.apache.thrift.protocol.TProtocol prot, replicateLog_args struct) throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
-        BitSet incoming = iprot.readBitSet(2);
+        BitSet incoming = iprot.readBitSet(3);
         if (incoming.get(0)) {
           struct.remoteTableId = iprot.readI32();
           struct.setRemoteTableIdIsSet(true);
@@ -748,6 +856,11 @@ import org.slf4j.LoggerFactory;
           struct.data.read(iprot);
           struct.setDataIsSet(true);
         }
+        if (incoming.get(2)) {
+          struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+          struct.credentials.read(iprot);
+          struct.setCredentialsIsSet(true);
+        }
       }
     }
 
@@ -1214,6 +1327,7 @@ import org.slf4j.LoggerFactory;
 
     private static final org.apache.thrift.protocol.TField REMOTE_TABLE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("remoteTableId", org.apache.thrift.protocol.TType.I32, (short)1);
     private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+    private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)3);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
@@ -1223,11 +1337,13 @@ import org.slf4j.LoggerFactory;
 
     public int remoteTableId; // required
     public KeyValues data; // required
+    public org.apache.accumulo.core.security.thrift.TCredentials credentials; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
       REMOTE_TABLE_ID((short)1, "remoteTableId"),
-      DATA((short)2, "data");
+      DATA((short)2, "data"),
+      CREDENTIALS((short)3, "credentials");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -1246,6 +1362,8 @@ import org.slf4j.LoggerFactory;
             return REMOTE_TABLE_ID;
           case 2: // DATA
             return DATA;
+          case 3: // CREDENTIALS
+            return CREDENTIALS;
           default:
             return null;
         }
@@ -1295,6 +1413,8 @@ import org.slf4j.LoggerFactory;
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
       tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, KeyValues.class)));
+      tmpMap.put(_Fields.CREDENTIALS, new org.apache.thrift.meta_data.FieldMetaData("credentials", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.security.thrift.TCredentials.class)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
       org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(replicateKeyValues_args.class, metaDataMap);
     }
@@ -1304,12 +1424,14 @@ import org.slf4j.LoggerFactory;
 
     public replicateKeyValues_args(
       int remoteTableId,
-      KeyValues data)
+      KeyValues data,
+      org.apache.accumulo.core.security.thrift.TCredentials credentials)
     {
       this();
       this.remoteTableId = remoteTableId;
       setRemoteTableIdIsSet(true);
       this.data = data;
+      this.credentials = credentials;
     }
 
     /**
@@ -1321,6 +1443,9 @@ import org.slf4j.LoggerFactory;
       if (other.isSetData()) {
         this.data = new KeyValues(other.data);
       }
+      if (other.isSetCredentials()) {
+        this.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(other.credentials);
+      }
     }
 
     public replicateKeyValues_args deepCopy() {
@@ -1332,6 +1457,7 @@ import org.slf4j.LoggerFactory;
       setRemoteTableIdIsSet(false);
       this.remoteTableId = 0;
       this.data = null;
+      this.credentials = null;
     }
 
     public int getRemoteTableId() {
@@ -1381,6 +1507,30 @@ import org.slf4j.LoggerFactory;
       }
     }
 
+    public org.apache.accumulo.core.security.thrift.TCredentials getCredentials() {
+      return this.credentials;
+    }
+
+    public replicateKeyValues_args setCredentials(org.apache.accumulo.core.security.thrift.TCredentials credentials) {
+      this.credentials = credentials;
+      return this;
+    }
+
+    public void unsetCredentials() {
+      this.credentials = null;
+    }
+
+    /** Returns true if field credentials is set (has been assigned a value) and false otherwise */
+    public boolean isSetCredentials() {
+      return this.credentials != null;
+    }
+
+    public void setCredentialsIsSet(boolean value) {
+      if (!value) {
+        this.credentials = null;
+      }
+    }
+
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
       case REMOTE_TABLE_ID:
@@ -1399,6 +1549,14 @@ import org.slf4j.LoggerFactory;
         }
         break;
 
+      case CREDENTIALS:
+        if (value == null) {
+          unsetCredentials();
+        } else {
+          setCredentials((org.apache.accumulo.core.security.thrift.TCredentials)value);
+        }
+        break;
+
       }
     }
 
@@ -1410,6 +1568,9 @@ import org.slf4j.LoggerFactory;
       case DATA:
         return getData();
 
+      case CREDENTIALS:
+        return getCredentials();
+
       }
       throw new IllegalStateException();
     }
@@ -1425,6 +1586,8 @@ import org.slf4j.LoggerFactory;
         return isSetRemoteTableId();
       case DATA:
         return isSetData();
+      case CREDENTIALS:
+        return isSetCredentials();
       }
       throw new IllegalStateException();
     }
@@ -1460,6 +1623,15 @@ import org.slf4j.LoggerFactory;
           return false;
       }
 
+      boolean this_present_credentials = true && this.isSetCredentials();
+      boolean that_present_credentials = true && that.isSetCredentials();
+      if (this_present_credentials || that_present_credentials) {
+        if (!(this_present_credentials && that_present_credentials))
+          return false;
+        if (!this.credentials.equals(that.credentials))
+          return false;
+      }
+
       return true;
     }
 
@@ -1496,6 +1668,16 @@ import org.slf4j.LoggerFactory;
           return lastComparison;
         }
       }
+      lastComparison = Boolean.valueOf(isSetCredentials()).compareTo(typedOther.isSetCredentials());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetCredentials()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentials, typedOther.credentials);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -1527,6 +1709,14 @@ import org.slf4j.LoggerFactory;
         sb.append(this.data);
       }
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("credentials:");
+      if (this.credentials == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.credentials);
+      }
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -1537,6 +1727,9 @@ import org.slf4j.LoggerFactory;
       if (data != null) {
         data.validate();
       }
+      if (credentials != null) {
+        credentials.validate();
+      }
     }
 
     private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -1592,6 +1785,15 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
+            case 3: // CREDENTIALS
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+                struct.credentials.read(iprot);
+                struct.setCredentialsIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
           }
@@ -1615,6 +1817,11 @@ import org.slf4j.LoggerFactory;
           struct.data.write(oprot);
           oprot.writeFieldEnd();
         }
+        if (struct.credentials != null) {
+          oprot.writeFieldBegin(CREDENTIALS_FIELD_DESC);
+          struct.credentials.write(oprot);
+          oprot.writeFieldEnd();
+        }
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
@@ -1639,19 +1846,25 @@ import org.slf4j.LoggerFactory;
         if (struct.isSetData()) {
           optionals.set(1);
         }
-        oprot.writeBitSet(optionals, 2);
+        if (struct.isSetCredentials()) {
+          optionals.set(2);
+        }
+        oprot.writeBitSet(optionals, 3);
         if (struct.isSetRemoteTableId()) {
           oprot.writeI32(struct.remoteTableId);
         }
         if (struct.isSetData()) {
           struct.data.write(oprot);
         }
+        if (struct.isSetCredentials()) {
+          struct.credentials.write(oprot);
+        }
       }
 
       @Override
       public void read(org.apache.thrift.protocol.TProtocol prot, replicateKeyValues_args struct) throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
-        BitSet incoming = iprot.readBitSet(2);
+        BitSet incoming = iprot.readBitSet(3);
         if (incoming.get(0)) {
           struct.remoteTableId = iprot.readI32();
           struct.setRemoteTableIdIsSet(true);
@@ -1661,6 +1874,11 @@ import org.slf4j.LoggerFactory;
           struct.data.read(iprot);
           struct.setDataIsSet(true);
         }
+        if (incoming.get(2)) {
+          struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+          struct.credentials.read(iprot);
+          struct.setCredentialsIsSet(true);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/thrift/replication.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/replication.thrift b/core/src/main/thrift/replication.thrift
index a5d1836..392e913 100644
--- a/core/src/main/thrift/replication.thrift
+++ b/core/src/main/thrift/replication.thrift
@@ -19,6 +19,7 @@ namespace java org.apache.accumulo.core.replication.thrift
 namespace cpp org.apache.accumulo.core.replication.thrift
 
 include "data.thrift"
+include "security.thrift"
 
 struct WalEdits {
     1:list<binary> edits
@@ -28,21 +29,35 @@ struct KeyValues {
     1:list<data.TKeyValue> keyValues
 }
 
-exception RemoteCoordinationException {
-    1:i32 code,
+enum RemoteReplicationErrorCode {
+  COULD_NOT_DESERIALIZE
+  COULD_NOT_APPLY
+  TABLE_DOES_NOT_EXIST
+  CANNOT_AUTHENTICATE
+  CANNOT_INSTANTIATE_REPLAYER
+}
+
+enum ReplicationCoordinatorErrorCode {
+  NO_AVAILABLE_SERVERS
+  SERVICE_CONFIGURATION_UNAVAILABLE
+  CANNOT_AUTHENTICATE
+}
+
+exception ReplicationCoordinatorException {
+    1:ReplicationCoordinatorErrorCode code,
     2:string reason
 }
 
 exception RemoteReplicationException {
-    1:i32 code,
+    1:RemoteReplicationErrorCode code,
     2:string reason
 }
 
 service ReplicationCoordinator {
-	string getServicerAddress(1:i32 remoteTableId) throws (1:RemoteCoordinationException e),
+	string getServicerAddress(1:i32 remoteTableId, 2:security.TCredentials credentials) throws (1:ReplicationCoordinatorException e),
 }
 
 service ReplicationServicer {
-    i64 replicateLog(1:i32 remoteTableId, 2:WalEdits data) throws (1:RemoteReplicationException e),
-    i64 replicateKeyValues(1:i32 remoteTableId, 2:KeyValues data) throws (1:RemoteReplicationException e)
+    i64 replicateLog(1:i32 remoteTableId, 2:WalEdits data, 3:security.TCredentials credentials) throws (1:RemoteReplicationException e),
+    i64 replicateKeyValues(1:i32 remoteTableId, 2:KeyValues data, 3:security.TCredentials credentials) throws (1:RemoteReplicationException e)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
index 9331075..974aaa9 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
@@ -23,13 +23,17 @@ import java.util.Set;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.replication.ReplicationCoordinatorErrorCode;
-import org.apache.accumulo.core.replication.thrift.RemoteCoordinationException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
+import org.apache.accumulo.core.replication.thrift.ReplicationCoordinatorErrorCode;
+import org.apache.accumulo.core.replication.thrift.ReplicationCoordinatorException;
+import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -47,6 +51,7 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac
   private final Instance inst;
   private final Random rand;
   private final ZooReader reader;
+  private final SecurityOperation security;
 
   public MasterReplicationCoordinator(Master master) {
     this(master, new ZooReader(master.getInstance().getZooKeepers(), master.getInstance().getZooKeepersSessionTimeOut()));
@@ -57,15 +62,22 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac
     this.rand = new Random(358923462l);
     this.inst = master.getInstance();
     this.reader = reader;
-    
+    this.security = SecurityOperation.getInstance(inst.getInstanceID(), false);
   }
 
 
   @Override
-  public String getServicerAddress(int remoteTableId) throws RemoteCoordinationException, TException {
+  public String getServicerAddress(int remoteTableId, TCredentials creds) throws ReplicationCoordinatorException, TException {
+    try { 
+      security.authenticateUser(SystemCredentials.get().toThrift(inst), creds);
+    } catch (ThriftSecurityException e) {
+      log.error("{} failed to authenticate for replication to {}", creds.getPrincipal(), remoteTableId);
+      throw new ReplicationCoordinatorException(ReplicationCoordinatorErrorCode.CANNOT_AUTHENTICATE, "Could not authenticate " + creds.getPrincipal());
+    }
+
     Set<TServerInstance> tservers = master.onlineTabletServers();
     if (tservers.isEmpty()) {
-      throw new RemoteCoordinationException(ReplicationCoordinatorErrorCode.NO_AVAILABLE_SERVERS.ordinal(), "No tservers are available for replication");
+      throw new ReplicationCoordinatorException(ReplicationCoordinatorErrorCode.NO_AVAILABLE_SERVERS, "No tservers are available for replication");
     }
 
     TServerInstance tserver = getRandomTServer(tservers, rand.nextInt(tservers.size()));
@@ -74,7 +86,7 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac
       replServiceAddr = new String(reader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS + "/" + tserver.hostPort(), null), StandardCharsets.UTF_8);
     } catch (KeeperException | InterruptedException e) {
       log.error("Could not fetch repliation service port for tserver", e);
-      throw new RemoteCoordinationException(ReplicationCoordinatorErrorCode.SERVICE_CONFIGURATION_UNAVAILABLE.ordinal(),
+      throw new ReplicationCoordinatorException(ReplicationCoordinatorErrorCode.SERVICE_CONFIGURATION_UNAVAILABLE,
           "Could not determine port for replication service running at " + tserver.hostPort());
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index ca1382f..c6b266f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -34,6 +35,7 @@ import org.apache.accumulo.core.client.impl.ClientExecReturn;
 import org.apache.accumulo.core.client.impl.ReplicationClient;
 import org.apache.accumulo.core.client.impl.ServerConfigurationUtil;
 import org.apache.accumulo.core.client.replication.ReplicaSystem;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
@@ -45,6 +47,8 @@ import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
 import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
 import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client;
 import org.apache.accumulo.core.replication.thrift.WalEdits;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -137,8 +141,10 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
   @Override
   public Status replicate(final Path p, final Status status, final ReplicationTarget target) {
-    Instance localInstance = HdfsZooInstance.getInstance();
-    AccumuloConfiguration localConf = ServerConfigurationUtil.getConfiguration(localInstance);
+    final Instance localInstance = HdfsZooInstance.getInstance();
+    final AccumuloConfiguration localConf = ServerConfigurationUtil.getConfiguration(localInstance);
+    Credentials credentialsForPeer = getCredentialsForPeer(localConf, target);
+    final TCredentials tCredsForPeer = credentialsForPeer.toThrift(localInstance);
 
     Instance peerInstance = getPeerInstance(target);
     // Remote identifier is an integer (table id) in this case.
@@ -154,7 +160,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
           @Override
           public String execute(ReplicationCoordinator.Client client) throws Exception {
-            return client.getServicerAddress(remoteTableId);
+            return client.getServicerAddress(remoteTableId, tCredsForPeer);
           }
 
         });
@@ -184,7 +190,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
                 if (p.getName().endsWith(RFILE_SUFFIX)) {
                   RFileReplication kvs = getKeyValues(target, p, status, sizeLimit);
                   if (0 < kvs.keyValues.getKeyValuesSize()) {
-                    long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues);
+                    long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues, tCredsForPeer);
                     if (entriesReplicated != kvs.keyValues.getKeyValuesSize()) {
                       log.warn("Sent {} KeyValue entries for replication but only {} were reported as replicated", kvs.keyValues.getKeyValuesSize(),
                           entriesReplicated);
@@ -198,7 +204,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
                   // If we have some edits to send
                   if (0 < edits.walEdits.getEditsSize()) {
-                    long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits);
+                    long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tCredsForPeer);
                     if (entriesReplicated != edits.numUpdates) {
                       log.warn("Sent {} WAL entries for replication but {} were reported as replicated", edits.numUpdates, entriesReplicated);
                     }
@@ -241,6 +247,24 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     return status;
   }
 
+  protected Credentials getCredentialsForPeer(AccumuloConfiguration conf, ReplicationTarget target) {
+    Preconditions.checkNotNull(conf);
+    Preconditions.checkNotNull(target);
+
+    String peerName = target.getPeerName();
+    String userKey = Property.REPLICATION_PEER_USER.getKey() + peerName, passwordKey = Property.REPLICATION_PEER_PASSWORD.getKey() + peerName;
+    Map<String,String> peerUsers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_USER);
+    Map<String,String> peerPasswords = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_PASSWORD);
+
+    String user = peerUsers.get(userKey);
+    String password = peerPasswords.get(passwordKey);
+    if (null == user || null == password) {
+      throw new IllegalArgumentException(userKey + " and " + passwordKey + " not configured, cannot replicate");
+    }
+
+    return new Credentials(user, new PasswordToken(password));
+  }
+
   protected Instance getPeerInstance(ReplicationTarget target) {
     return new ZooKeeperInstance(instanceName, zookeepers);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
index ea50199..8b1a402 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
@@ -29,8 +29,8 @@ import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.replication.AccumuloReplicationReplayer;
-import org.apache.accumulo.core.replication.RemoteReplicationErrorCode;
 import org.apache.accumulo.core.replication.thrift.KeyValues;
+import org.apache.accumulo.core.replication.thrift.RemoteReplicationErrorCode;
 import org.apache.accumulo.core.replication.thrift.RemoteReplicationException;
 import org.apache.accumulo.core.replication.thrift.WalEdits;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -64,7 +64,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
           value.readFields(dis);
         } catch (IOException e) {
           log.error("Could not deserialize edit from stream", e);
-          throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_DESERIALIZE.ordinal(), "Could not deserialize edit from stream");
+          throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_DESERIALIZE, "Could not deserialize edit from stream");
         }
 
         // Create the batchScanner if we don't already have one.
@@ -74,7 +74,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
           try {
             bw = conn.createBatchWriter(tableName, bwConfig);
           } catch (TableNotFoundException e) {
-            throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table " + tableName + " does not exist");
+            throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST, "Table " + tableName + " does not exist");
           }
         }
 
@@ -84,7 +84,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
           bw.addMutations(value.mutations);
         } catch (MutationsRejectedException e) {
           log.error("Could not apply mutations to {}", tableName);
-          throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + tableName);
+          throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY, "Could not apply mutations to " + tableName);
         }
 
         mutationsApplied += value.mutations.size();
@@ -95,7 +95,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
           bw.close();
         } catch (MutationsRejectedException e) {
           log.error("Could not apply mutations to {}", tableName);
-          throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + tableName);
+          throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY, "Could not apply mutations to " + tableName);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
index 820c586..3a9bf9b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
@@ -28,13 +28,13 @@ import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.replication.AccumuloReplicationReplayer;
-import org.apache.accumulo.core.replication.RemoteReplicationErrorCode;
 import org.apache.accumulo.core.replication.thrift.KeyValues;
+import org.apache.accumulo.core.replication.thrift.RemoteReplicationErrorCode;
 import org.apache.accumulo.core.replication.thrift.RemoteReplicationException;
 import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Iface;
 import org.apache.accumulo.core.replication.thrift.WalEdits;
 import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,11 +52,11 @@ public class ReplicationServicerHandler implements Iface {
   }
 
   @Override
-  public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, TException {
+  public long replicateLog(int remoteTableId, WalEdits data, TCredentials tcreds) throws RemoteReplicationException, TException {
     log.debug("Got replication request to tableID {} with {} edits", remoteTableId, data.getEditsSize());
 
     String tableId = Integer.toString(remoteTableId);
-    Credentials creds = SystemCredentials.get();
+    Credentials creds = Credentials.fromThrift(tcreds);
     Connector conn;
     String tableName;
 
@@ -64,14 +64,14 @@ public class ReplicationServicerHandler implements Iface {
       conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
     } catch (AccumuloException | AccumuloSecurityException e) {
       log.error("Could not get connection", e);
-      throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_AUTHENTICATE.ordinal(), "Cannot get connector");
+      throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_AUTHENTICATE, "Cannot get connector as " + creds.getPrincipal());
     }
 
     try {
       tableName = Tables.getTableName(inst, tableId);
     } catch (TableNotFoundException e) {
       log.error("Could not find table with id {}", tableId);
-      throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table with id " + tableId + " does not exist");
+      throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST, "Table with id " + tableId + " does not exist");
     }
 
     AccumuloConfiguration conf = ServerConfigurationUtil.getConfiguration(inst);
@@ -96,7 +96,7 @@ public class ReplicationServicerHandler implements Iface {
       clz = untypedClz.asSubclass(AccumuloReplicationReplayer.class);
     } catch (ClassNotFoundException e) {
       log.error("Could not instantiate replayer class {}", handlerClassForTable, e);
-      throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER.ordinal(), "Could not instantiate replayer class "
+      throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER, "Could not instantiate replayer class "
           + handlerClassForTable);
     }
 
@@ -106,7 +106,7 @@ public class ReplicationServicerHandler implements Iface {
       replayer = clz.newInstance();
     } catch (InstantiationException | IllegalAccessException e1) {
       log.error("Could not instantiate replayer class {}", clz.getName());
-      throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER.ordinal(), "Could not instantiate replayer class"
+      throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER, "Could not instantiate replayer class"
           + clz.getName());
     }
 
@@ -116,7 +116,7 @@ public class ReplicationServicerHandler implements Iface {
   }
 
   @Override
-  public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, TException {
+  public long replicateKeyValues(int remoteTableId, KeyValues data, TCredentials creds) throws RemoteReplicationException, TException {
     throw new UnsupportedOperationException();
   }
 


Mime
View raw message