hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [2/3] hbase git commit: HBASE-11992 Backport HBASE-11367 (Pluggable replication endpoint) to 0.98 (Ramkrishna S. Vasudevan)
Date Sat, 22 Nov 2014 17:32:00 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 9d037f5..a1caf87 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -4828,6 +4828,71 @@ public final class ZooKeeperProtos {
      */
     com.google.protobuf.ByteString
         getClusterkeyBytes();
+
+    // optional string replicationEndpointImpl = 2;
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    boolean hasReplicationEndpointImpl();
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    java.lang.String getReplicationEndpointImpl();
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getReplicationEndpointImplBytes();
+
+    // repeated .BytesBytesPair data = 3;
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> 
+        getDataList();
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair getData(int index);
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    int getDataCount();
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder> 
+        getDataOrBuilderList();
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder getDataOrBuilder(
+        int index);
+
+    // repeated .NameStringPair configuration = 4;
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> 
+        getConfigurationList();
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair getConfiguration(int index);
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    int getConfigurationCount();
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder> 
+        getConfigurationOrBuilderList();
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder getConfigurationOrBuilder(
+        int index);
   }
   /**
    * Protobuf type {@code ReplicationPeer}
@@ -4890,6 +4955,27 @@ public final class ZooKeeperProtos {
               clusterkey_ = input.readBytes();
               break;
             }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              replicationEndpointImpl_ = input.readBytes();
+              break;
+            }
+            case 26: {
+              if (!((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+                data_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair>();
+                mutable_bitField0_ |= 0x00000004;
+              }
+              data_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.PARSER, extensionRegistry));
+              break;
+            }
+            case 34: {
+              if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+                configuration_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair>();
+                mutable_bitField0_ |= 0x00000008;
+              }
+              configuration_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.PARSER, extensionRegistry));
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4898,6 +4984,12 @@ public final class ZooKeeperProtos {
         throw new com.google.protobuf.InvalidProtocolBufferException(
             e.getMessage()).setUnfinishedMessage(this);
       } finally {
+        if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+          data_ = java.util.Collections.unmodifiableList(data_);
+        }
+        if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+          configuration_ = java.util.Collections.unmodifiableList(configuration_);
+        }
         this.unknownFields = unknownFields.build();
         makeExtensionsImmutable();
       }
@@ -4988,8 +5080,126 @@ public final class ZooKeeperProtos {
       }
     }
 
+    // optional string replicationEndpointImpl = 2;
+    public static final int REPLICATIONENDPOINTIMPL_FIELD_NUMBER = 2;
+    private java.lang.Object replicationEndpointImpl_;
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    public boolean hasReplicationEndpointImpl() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    public java.lang.String getReplicationEndpointImpl() {
+      java.lang.Object ref = replicationEndpointImpl_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          replicationEndpointImpl_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    public com.google.protobuf.ByteString
+        getReplicationEndpointImplBytes() {
+      java.lang.Object ref = replicationEndpointImpl_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        replicationEndpointImpl_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // repeated .BytesBytesPair data = 3;
+    public static final int DATA_FIELD_NUMBER = 3;
+    private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> data_;
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> getDataList() {
+      return data_;
+    }
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder> 
+        getDataOrBuilderList() {
+      return data_;
+    }
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    public int getDataCount() {
+      return data_.size();
+    }
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair getData(int index) {
+      return data_.get(index);
+    }
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder getDataOrBuilder(
+        int index) {
+      return data_.get(index);
+    }
+
+    // repeated .NameStringPair configuration = 4;
+    public static final int CONFIGURATION_FIELD_NUMBER = 4;
+    private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> configuration_;
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> getConfigurationList() {
+      return configuration_;
+    }
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder> 
+        getConfigurationOrBuilderList() {
+      return configuration_;
+    }
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    public int getConfigurationCount() {
+      return configuration_.size();
+    }
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair getConfiguration(int index) {
+      return configuration_.get(index);
+    }
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder getConfigurationOrBuilder(
+        int index) {
+      return configuration_.get(index);
+    }
+
     private void initFields() {
       clusterkey_ = "";
+      replicationEndpointImpl_ = "";
+      data_ = java.util.Collections.emptyList();
+      configuration_ = java.util.Collections.emptyList();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5000,6 +5210,18 @@ public final class ZooKeeperProtos {
         memoizedIsInitialized = 0;
         return false;
       }
+      for (int i = 0; i < getDataCount(); i++) {
+        if (!getData(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      for (int i = 0; i < getConfigurationCount(); i++) {
+        if (!getConfiguration(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -5010,6 +5232,15 @@ public final class ZooKeeperProtos {
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         output.writeBytes(1, getClusterkeyBytes());
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getReplicationEndpointImplBytes());
+      }
+      for (int i = 0; i < data_.size(); i++) {
+        output.writeMessage(3, data_.get(i));
+      }
+      for (int i = 0; i < configuration_.size(); i++) {
+        output.writeMessage(4, configuration_.get(i));
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -5023,6 +5254,18 @@ public final class ZooKeeperProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(1, getClusterkeyBytes());
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getReplicationEndpointImplBytes());
+      }
+      for (int i = 0; i < data_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(3, data_.get(i));
+      }
+      for (int i = 0; i < configuration_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(4, configuration_.get(i));
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5051,6 +5294,15 @@ public final class ZooKeeperProtos {
         result = result && getClusterkey()
             .equals(other.getClusterkey());
       }
+      result = result && (hasReplicationEndpointImpl() == other.hasReplicationEndpointImpl());
+      if (hasReplicationEndpointImpl()) {
+        result = result && getReplicationEndpointImpl()
+            .equals(other.getReplicationEndpointImpl());
+      }
+      result = result && getDataList()
+          .equals(other.getDataList());
+      result = result && getConfigurationList()
+          .equals(other.getConfigurationList());
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5068,6 +5320,18 @@ public final class ZooKeeperProtos {
         hash = (37 * hash) + CLUSTERKEY_FIELD_NUMBER;
         hash = (53 * hash) + getClusterkey().hashCode();
       }
+      if (hasReplicationEndpointImpl()) {
+        hash = (37 * hash) + REPLICATIONENDPOINTIMPL_FIELD_NUMBER;
+        hash = (53 * hash) + getReplicationEndpointImpl().hashCode();
+      }
+      if (getDataCount() > 0) {
+        hash = (37 * hash) + DATA_FIELD_NUMBER;
+        hash = (53 * hash) + getDataList().hashCode();
+      }
+      if (getConfigurationCount() > 0) {
+        hash = (37 * hash) + CONFIGURATION_FIELD_NUMBER;
+        hash = (53 * hash) + getConfigurationList().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5174,6 +5438,8 @@ public final class ZooKeeperProtos {
       }
       private void maybeForceBuilderInitialization() {
         if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+          getDataFieldBuilder();
+          getConfigurationFieldBuilder();
         }
       }
       private static Builder create() {
@@ -5184,6 +5450,20 @@ public final class ZooKeeperProtos {
         super.clear();
         clusterkey_ = "";
         bitField0_ = (bitField0_ & ~0x00000001);
+        replicationEndpointImpl_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        if (dataBuilder_ == null) {
+          data_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000004);
+        } else {
+          dataBuilder_.clear();
+        }
+        if (configurationBuilder_ == null) {
+          configuration_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000008);
+        } else {
+          configurationBuilder_.clear();
+        }
         return this;
       }
 
@@ -5216,6 +5496,28 @@ public final class ZooKeeperProtos {
           to_bitField0_ |= 0x00000001;
         }
         result.clusterkey_ = clusterkey_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.replicationEndpointImpl_ = replicationEndpointImpl_;
+        if (dataBuilder_ == null) {
+          if (((bitField0_ & 0x00000004) == 0x00000004)) {
+            data_ = java.util.Collections.unmodifiableList(data_);
+            bitField0_ = (bitField0_ & ~0x00000004);
+          }
+          result.data_ = data_;
+        } else {
+          result.data_ = dataBuilder_.build();
+        }
+        if (configurationBuilder_ == null) {
+          if (((bitField0_ & 0x00000008) == 0x00000008)) {
+            configuration_ = java.util.Collections.unmodifiableList(configuration_);
+            bitField0_ = (bitField0_ & ~0x00000008);
+          }
+          result.configuration_ = configuration_;
+        } else {
+          result.configuration_ = configurationBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -5237,6 +5539,63 @@ public final class ZooKeeperProtos {
           clusterkey_ = other.clusterkey_;
           onChanged();
         }
+        if (other.hasReplicationEndpointImpl()) {
+          bitField0_ |= 0x00000002;
+          replicationEndpointImpl_ = other.replicationEndpointImpl_;
+          onChanged();
+        }
+        if (dataBuilder_ == null) {
+          if (!other.data_.isEmpty()) {
+            if (data_.isEmpty()) {
+              data_ = other.data_;
+              bitField0_ = (bitField0_ & ~0x00000004);
+            } else {
+              ensureDataIsMutable();
+              data_.addAll(other.data_);
+            }
+            onChanged();
+          }
+        } else {
+          if (!other.data_.isEmpty()) {
+            if (dataBuilder_.isEmpty()) {
+              dataBuilder_.dispose();
+              dataBuilder_ = null;
+              data_ = other.data_;
+              bitField0_ = (bitField0_ & ~0x00000004);
+              dataBuilder_ = 
+                com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+                   getDataFieldBuilder() : null;
+            } else {
+              dataBuilder_.addAllMessages(other.data_);
+            }
+          }
+        }
+        if (configurationBuilder_ == null) {
+          if (!other.configuration_.isEmpty()) {
+            if (configuration_.isEmpty()) {
+              configuration_ = other.configuration_;
+              bitField0_ = (bitField0_ & ~0x00000008);
+            } else {
+              ensureConfigurationIsMutable();
+              configuration_.addAll(other.configuration_);
+            }
+            onChanged();
+          }
+        } else {
+          if (!other.configuration_.isEmpty()) {
+            if (configurationBuilder_.isEmpty()) {
+              configurationBuilder_.dispose();
+              configurationBuilder_ = null;
+              configuration_ = other.configuration_;
+              bitField0_ = (bitField0_ & ~0x00000008);
+              configurationBuilder_ = 
+                com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+                   getConfigurationFieldBuilder() : null;
+            } else {
+              configurationBuilder_.addAllMessages(other.configuration_);
+            }
+          }
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -5246,6 +5605,18 @@ public final class ZooKeeperProtos {
           
           return false;
         }
+        for (int i = 0; i < getDataCount(); i++) {
+          if (!getData(i).isInitialized()) {
+            
+            return false;
+          }
+        }
+        for (int i = 0; i < getConfigurationCount(); i++) {
+          if (!getConfiguration(i).isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
 
@@ -5372,6 +5743,560 @@ public final class ZooKeeperProtos {
         return this;
       }
 
+      // optional string replicationEndpointImpl = 2;
+      private java.lang.Object replicationEndpointImpl_ = "";
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public boolean hasReplicationEndpointImpl() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public java.lang.String getReplicationEndpointImpl() {
+        java.lang.Object ref = replicationEndpointImpl_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          replicationEndpointImpl_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public com.google.protobuf.ByteString
+          getReplicationEndpointImplBytes() {
+        java.lang.Object ref = replicationEndpointImpl_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          replicationEndpointImpl_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public Builder setReplicationEndpointImpl(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        replicationEndpointImpl_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public Builder clearReplicationEndpointImpl() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        replicationEndpointImpl_ = getDefaultInstance().getReplicationEndpointImpl();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public Builder setReplicationEndpointImplBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        replicationEndpointImpl_ = value;
+        onChanged();
+        return this;
+      }
+
+      // repeated .BytesBytesPair data = 3;
+      private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> data_ =
+        java.util.Collections.emptyList();
+      private void ensureDataIsMutable() {
+        if (!((bitField0_ & 0x00000004) == 0x00000004)) {
+          data_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair>(data_);
+          bitField0_ |= 0x00000004;
+         }
+      }
+
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder> dataBuilder_;
+
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> getDataList() {
+        if (dataBuilder_ == null) {
+          return java.util.Collections.unmodifiableList(data_);
+        } else {
+          return dataBuilder_.getMessageList();
+        }
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public int getDataCount() {
+        if (dataBuilder_ == null) {
+          return data_.size();
+        } else {
+          return dataBuilder_.getCount();
+        }
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair getData(int index) {
+        if (dataBuilder_ == null) {
+          return data_.get(index);
+        } else {
+          return dataBuilder_.getMessage(index);
+        }
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder setData(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair value) {
+        if (dataBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureDataIsMutable();
+          data_.set(index, value);
+          onChanged();
+        } else {
+          dataBuilder_.setMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder setData(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder builderForValue) {
+        if (dataBuilder_ == null) {
+          ensureDataIsMutable();
+          data_.set(index, builderForValue.build());
+          onChanged();
+        } else {
+          dataBuilder_.setMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder addData(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair value) {
+        if (dataBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureDataIsMutable();
+          data_.add(value);
+          onChanged();
+        } else {
+          dataBuilder_.addMessage(value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder addData(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair value) {
+        if (dataBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureDataIsMutable();
+          data_.add(index, value);
+          onChanged();
+        } else {
+          dataBuilder_.addMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder addData(
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder builderForValue) {
+        if (dataBuilder_ == null) {
+          ensureDataIsMutable();
+          data_.add(builderForValue.build());
+          onChanged();
+        } else {
+          dataBuilder_.addMessage(builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder addData(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder builderForValue) {
+        if (dataBuilder_ == null) {
+          ensureDataIsMutable();
+          data_.add(index, builderForValue.build());
+          onChanged();
+        } else {
+          dataBuilder_.addMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder addAllData(
+          java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> values) {
+        if (dataBuilder_ == null) {
+          ensureDataIsMutable();
+          super.addAll(values, data_);
+          onChanged();
+        } else {
+          dataBuilder_.addAllMessages(values);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder clearData() {
+        if (dataBuilder_ == null) {
+          data_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000004);
+          onChanged();
+        } else {
+          dataBuilder_.clear();
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder removeData(int index) {
+        if (dataBuilder_ == null) {
+          ensureDataIsMutable();
+          data_.remove(index);
+          onChanged();
+        } else {
+          dataBuilder_.remove(index);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder getDataBuilder(
+          int index) {
+        return getDataFieldBuilder().getBuilder(index);
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder getDataOrBuilder(
+          int index) {
+        if (dataBuilder_ == null) {
+          return data_.get(index);  } else {
+          return dataBuilder_.getMessageOrBuilder(index);
+        }
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder> 
+           getDataOrBuilderList() {
+        if (dataBuilder_ != null) {
+          return dataBuilder_.getMessageOrBuilderList();
+        } else {
+          return java.util.Collections.unmodifiableList(data_);
+        }
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder addDataBuilder() {
+        return getDataFieldBuilder().addBuilder(
+            org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder addDataBuilder(
+          int index) {
+        return getDataFieldBuilder().addBuilder(
+            index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder> 
+           getDataBuilderList() {
+        return getDataFieldBuilder().getBuilderList();
+      }
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder> 
+          getDataFieldBuilder() {
+        if (dataBuilder_ == null) {
+          dataBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder>(
+                  data_,
+                  ((bitField0_ & 0x00000004) == 0x00000004),
+                  getParentForChildren(),
+                  isClean());
+          data_ = null;
+        }
+        return dataBuilder_;
+      }
+
+      // repeated .NameStringPair configuration = 4;
+      private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> configuration_ =
+        java.util.Collections.emptyList();
+      private void ensureConfigurationIsMutable() {
+        if (!((bitField0_ & 0x00000008) == 0x00000008)) {
+          configuration_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair>(configuration_);
+          bitField0_ |= 0x00000008;
+         }
+      }
+
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder> configurationBuilder_;
+
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> getConfigurationList() {
+        if (configurationBuilder_ == null) {
+          return java.util.Collections.unmodifiableList(configuration_);
+        } else {
+          return configurationBuilder_.getMessageList();
+        }
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public int getConfigurationCount() {
+        if (configurationBuilder_ == null) {
+          return configuration_.size();
+        } else {
+          return configurationBuilder_.getCount();
+        }
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair getConfiguration(int index) {
+        if (configurationBuilder_ == null) {
+          return configuration_.get(index);
+        } else {
+          return configurationBuilder_.getMessage(index);
+        }
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder setConfiguration(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair value) {
+        if (configurationBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureConfigurationIsMutable();
+          configuration_.set(index, value);
+          onChanged();
+        } else {
+          configurationBuilder_.setMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder setConfiguration(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder builderForValue) {
+        if (configurationBuilder_ == null) {
+          ensureConfigurationIsMutable();
+          configuration_.set(index, builderForValue.build());
+          onChanged();
+        } else {
+          configurationBuilder_.setMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder addConfiguration(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair value) {
+        if (configurationBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureConfigurationIsMutable();
+          configuration_.add(value);
+          onChanged();
+        } else {
+          configurationBuilder_.addMessage(value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder addConfiguration(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair value) {
+        if (configurationBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureConfigurationIsMutable();
+          configuration_.add(index, value);
+          onChanged();
+        } else {
+          configurationBuilder_.addMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder addConfiguration(
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder builderForValue) {
+        if (configurationBuilder_ == null) {
+          ensureConfigurationIsMutable();
+          configuration_.add(builderForValue.build());
+          onChanged();
+        } else {
+          configurationBuilder_.addMessage(builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder addConfiguration(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder builderForValue) {
+        if (configurationBuilder_ == null) {
+          ensureConfigurationIsMutable();
+          configuration_.add(index, builderForValue.build());
+          onChanged();
+        } else {
+          configurationBuilder_.addMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder addAllConfiguration(
+          java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> values) {
+        if (configurationBuilder_ == null) {
+          ensureConfigurationIsMutable();
+          super.addAll(values, configuration_);
+          onChanged();
+        } else {
+          configurationBuilder_.addAllMessages(values);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder clearConfiguration() {
+        if (configurationBuilder_ == null) {
+          configuration_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000008);
+          onChanged();
+        } else {
+          configurationBuilder_.clear();
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder removeConfiguration(int index) {
+        if (configurationBuilder_ == null) {
+          ensureConfigurationIsMutable();
+          configuration_.remove(index);
+          onChanged();
+        } else {
+          configurationBuilder_.remove(index);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder getConfigurationBuilder(
+          int index) {
+        return getConfigurationFieldBuilder().getBuilder(index);
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder getConfigurationOrBuilder(
+          int index) {
+        if (configurationBuilder_ == null) {
+          return configuration_.get(index);  } else {
+          return configurationBuilder_.getMessageOrBuilder(index);
+        }
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder> 
+           getConfigurationOrBuilderList() {
+        if (configurationBuilder_ != null) {
+          return configurationBuilder_.getMessageOrBuilderList();
+        } else {
+          return java.util.Collections.unmodifiableList(configuration_);
+        }
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder addConfigurationBuilder() {
+        return getConfigurationFieldBuilder().addBuilder(
+            org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder addConfigurationBuilder(
+          int index) {
+        return getConfigurationFieldBuilder().addBuilder(
+            index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder> 
+           getConfigurationBuilderList() {
+        return getConfigurationFieldBuilder().getBuilderList();
+      }
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder> 
+          getConfigurationFieldBuilder() {
+        if (configurationBuilder_ == null) {
+          configurationBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder>(
+                  configuration_,
+                  ((bitField0_ & 0x00000008) == 0x00000008),
+                  getParentForChildren(),
+                  isClean());
+          configuration_ = null;
+        }
+        return configurationBuilder_;
+      }
+
       // @@protoc_insertion_point(builder_scope:ReplicationPeer)
     }
 
@@ -9598,23 +10523,25 @@ public final class ZooKeeperProtos {
       "NKNOWN\020\000\022\021\n\rLOG_SPLITTING\020\001\022\016\n\nLOG_REPLA" +
       "Y\020\002\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table.Stat" +
       "e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
-      "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"%\n\017R" +
-      "eplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"^\n\020Re" +
-      "plicationState\022&\n\005state\030\001 \002(\0162\027.Replicat",
-      "ionState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010" +
-      "DISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n" +
-      "\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlo" +
-      "ck_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntable_na" +
-      "me\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030\002 \001(\013" +
-      "2\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_s" +
-      "hared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_t" +
-      "ime\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013family_n" +
-      "ame\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026Region" +
-      "StoreSequenceIds\022 \n\030last_flushed_sequenc",
-      "e_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003(\0132\020." +
-      "StoreSequenceIdBE\n*org.apache.hadoop.hba" +
-      "se.protobuf.generatedB\017ZooKeeperProtosH\001" +
-      "\210\001\001\240\001\001"
+      "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"\215\001\n\017" +
+      "ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027r" +
+      "eplicationEndpointImpl\030\002 \001(\t\022\035\n\004data\030\003 \003",
+      "(\0132\017.BytesBytesPair\022&\n\rconfiguration\030\004 \003" +
+      "(\0132\017.NameStringPair\"^\n\020ReplicationState\022" +
+      "&\n\005state\030\001 \002(\0162\027.ReplicationState.State\"" +
+      "\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027R" +
+      "eplicationHLogPosition\022\020\n\010position\030\001 \002(\003" +
+      "\"%\n\017ReplicationLock\022\022\n\nlock_owner\030\001 \002(\t\"" +
+      "\230\001\n\tTableLock\022\036\n\ntable_name\030\001 \001(\0132\n.Tabl" +
+      "eName\022\037\n\nlock_owner\030\002 \001(\0132\013.ServerName\022\021" +
+      "\n\tthread_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007" +
+      "purpose\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\";\n\017St",
+      "oreSequenceId\022\023\n\013family_name\030\001 \002(\014\022\023\n\013se" +
+      "quence_id\030\002 \002(\004\"g\n\026RegionStoreSequenceId" +
+      "s\022 \n\030last_flushed_sequence_id\030\001 \002(\004\022+\n\021s" +
+      "tore_sequence_id\030\002 \003(\0132\020.StoreSequenceId" +
+      "BE\n*org.apache.hadoop.hbase.protobuf.gen" +
+      "eratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9662,7 +10589,7 @@ public final class ZooKeeperProtos {
           internal_static_ReplicationPeer_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ReplicationPeer_descriptor,
-              new java.lang.String[] { "Clusterkey", });
+              new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", });
           internal_static_ReplicationState_descriptor =
             getDescriptor().getMessageTypes().get(7);
           internal_static_ReplicationState_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-protocol/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index 37816da..598385c 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -119,6 +119,9 @@ message ReplicationPeer {
   // clusterkey is the concatenation of the slave cluster's
   // hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
   required string clusterkey = 1;
+  optional string replicationEndpointImpl = 2;
+  repeated BytesBytesPair data = 3;
+  repeated NameStringPair configuration = 4;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 1088688..16c294b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -24,7 +24,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnectable;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -40,11 +42,13 @@ import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
@@ -168,6 +172,7 @@ public class VerifyReplication extends Configured implements Tool {
       LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow()));
     }
     
+    @Override
     protected void cleanup(Context context) {
       if (replicatedScanner != null) {
         try {
@@ -188,7 +193,7 @@ public class VerifyReplication extends Configured implements Tool {
 
   private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
     ZooKeeperWatcher localZKW = null;
-    ReplicationPeer peer = null;
+    ReplicationPeerZKImpl peer = null;
     try {
       localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
           new Abortable() {
@@ -199,11 +204,11 @@ public class VerifyReplication extends Configured implements Tool {
       ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
       rp.init();
 
-      Configuration peerConf = rp.getPeerConf(peerId);
-      if (peerConf == null) {
+      Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
+      if (pair == null) {
         throw new IOException("Couldn't get peer conf!");
       }
-
+      Configuration peerConf = rp.getPeerConf(peerId).getSecond();
       return ZKUtil.getZooKeeperClusterKey(peerConf);
     } catch (ReplicationException e) {
       throw new IOException(

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index 3bc8173..c9150fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
@@ -119,6 +120,7 @@ public interface HLog {
    * Only used when splitting logs.
    */
   // TODO: Remove this Writable.
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
   class Entry implements Writable {
     private WALEdit edit;
     private HLogKey key;
@@ -212,12 +214,12 @@ public interface HLog {
    * @return the number of HLog files
    */
   int getNumLogFiles();
-  
+
   /**
    * @return the size of HLog files
    */
   long getLogFileSize();
-  
+
   // TODO: Log rolling should not be in this interface.
   /**
    * Roll the log writer. That is, start writing log messages to a new file.
@@ -300,6 +302,7 @@ public interface HLog {
    * @return txid of this transaction
    * @throws IOException
    */
+  @Deprecated
   long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
       List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
       boolean isInMemstore, long nonceGroup, long nonce) throws IOException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 3d69e34..9464c51 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -31,17 +31,18 @@ import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.UUID;
 
-import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
@@ -61,7 +62,7 @@ import com.google.protobuf.ByteString;
  */
 // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
 //       purposes. They need to be merged into HLogEntry.
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public class HLogKey implements WritableComparable<HLogKey> {
   public static final Log LOG = LogFactory.getLog(HLogKey.class);
 
@@ -297,6 +298,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
     return result;
   }
 
+  @Override
   public int compareTo(HLogKey o) {
     int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
     if (result == 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index 2b67174..a46c88d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.codec.Codec;
@@ -75,7 +76,7 @@ import org.apache.hadoop.io.Writable;
  * is an old style KeyValue or the new style WALEdit.
  *
  */
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public class WALEdit implements Writable, HeapSize {
   public static final Log LOG = LogFactory.getLog(WALEdit.class);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
new file mode 100644
index 0000000..1c9d0f6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractService;
+
+/**
+ * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this
+ * class rather than implementing {@link ReplicationEndpoint} directly for better backwards
+ * compatibility.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public abstract class BaseReplicationEndpoint extends AbstractService
+  implements ReplicationEndpoint {
+
+  protected Context ctx;
+
+  @Override
+  public void init(Context context) throws IOException {
+    this.ctx = context;
+  }
+
+  /** Returns a default set of filters */
+  @Override
+  public WALEntryFilter getWALEntryfilter() {
+    ArrayList<WALEntryFilter> filters = Lists.newArrayList();
+    WALEntryFilter scopeFilter = getScopeWALEntryFilter();
+    if (scopeFilter != null) {
+      filters.add(scopeFilter);
+    }
+    WALEntryFilter tableCfFilter = getTableCfWALEntryFilter();
+    if (tableCfFilter != null) {
+      filters.add(tableCfFilter);
+    }
+    return filters.isEmpty() ? null : new ChainWALEntryFilter(filters);
+  }
+
+  /** Returns a WALEntryFilter for checking the scope. Subclasses can
+   * return null if they don't want this filter */
+  protected WALEntryFilter getScopeWALEntryFilter() {
+    return new ScopeWALEntryFilter();
+  }
+
+  /** Returns a WALEntryFilter for checking replication per table and CF. Subclasses can
+   * return null if they don't want this filter */
+  protected WALEntryFilter getTableCfWALEntryFilter() {
+    return new TableCfWALEntryFilter(ctx.getReplicationPeer());
+  }
+
+  @Override
+  public boolean canReplicateToSameCluster() {
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
new file mode 100644
index 0000000..f701e94
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+
+/**
+ * A {@link WALEntryFilter} which contains multiple filters and applies them
+ * in chain order
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public class ChainWALEntryFilter implements WALEntryFilter {
+
+  private final WALEntryFilter[] filters;
+
+  public ChainWALEntryFilter(WALEntryFilter...filters) {
+    this.filters = filters;
+  }
+
+  public ChainWALEntryFilter(List<WALEntryFilter> filters) {
+    ArrayList<WALEntryFilter> rawFilters = new ArrayList<WALEntryFilter>(filters.size());
+    // flatten the chains
+    for (WALEntryFilter filter : filters) {
+      if (filter instanceof ChainWALEntryFilter) {
+        for (WALEntryFilter f : ((ChainWALEntryFilter) filter).filters) {
+          rawFilters.add(f);
+        }
+      } else {
+        rawFilters.add(filter);
+      }
+    }
+
+    this.filters = rawFilters.toArray(new WALEntryFilter[rawFilters.size()]);
+  }
+
+  @Override
+  public Entry filter(Entry entry) {
+    for (WALEntryFilter filter : filters) {
+      if (entry == null) {
+        return null;
+      }
+      entry = filter.filter(entry);
+    }
+    return entry;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
new file mode 100644
index 0000000..3929390
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.AuthFailedException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+
+/**
+ * A {@link BaseReplicationEndpoint} for replication endpoints whose
+ * target cluster is an HBase cluster.
+ */
+@InterfaceAudience.Private
+public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
+  implements Abortable {
+
+  private static final Log LOG = LogFactory.getLog(HBaseReplicationEndpoint.class);
+
+  private ZooKeeperWatcher zkw = null;
+
+  private List<ServerName> regionServers = new ArrayList<ServerName>(0);
+  private volatile long lastRegionServerUpdate;
+
+  protected void disconnect() {
+    if (zkw != null){
+      zkw.close();
+    }
+  }
+
+  /**
+   * A private method used to re-establish a zookeeper session with a peer cluster.
+   * @param ke
+   */
+  protected void reconnect(KeeperException ke) {
+    if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
+        || ke instanceof AuthFailedException) {
+      String clusterKey = ctx.getPeerConfig().getClusterKey();
+      LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke);
+      try {
+        reloadZkWatcher();
+      } catch (IOException io) {
+        LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io);
+      }
+    }
+  }
+
+  @Override
+  protected void doStart() {
+    try {
+      reloadZkWatcher();
+      notifyStarted();
+    } catch (IOException e) {
+      notifyFailed(e);
+    }
+  }
+
+  @Override
+  protected void doStop() {
+    disconnect();
+    notifyStopped();
+  }
+
+  @Override
+  // Synchronize peer cluster connection attempts to avoid races and rate
+  // limit connections when multiple replication sources try to connect to
+  // the peer cluster. If the peer cluster is down we can get out of control
+  // over time.
+  public synchronized UUID getPeerUUID() {
+    UUID peerUUID = null;
+    try {
+      peerUUID = ZKClusterId.getUUIDForCluster(zkw);
+    } catch (KeeperException ke) {
+      reconnect(ke);
+    }
+    return peerUUID;
+  }
+
+  /**
+   * Get the ZK connection to this peer
+   * @return zk connection
+   */
+  protected ZooKeeperWatcher getZkw() {
+    return zkw;
+  }
+
+  /**
+   * Closes the current ZKW (if not null) and creates a new one
+   * @throws IOException If anything goes wrong connecting
+   */
+  void reloadZkWatcher() throws IOException {
+    if (zkw != null) zkw.close();
+    zkw = new ZooKeeperWatcher(ctx.getConfiguration(),
+        "connection to cluster: " + ctx.getPeerId(), this);
+    getZkw().registerListener(new PeerRegionServerListener(this));
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    LOG.fatal("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId()
+        + " was aborted for the following reason(s):" + why, e);
+  }
+
+  @Override
+  public boolean isAborted() {
+    // Currently this is never "Aborted", we just log when the abort method is called.
+    return false;
+  }
+
+  /**
+   * Get the list of all the region servers from the specified peer
+   * @param zkw zk connection to use
+   * @return list of region server addresses or an empty list if the slave is unavailable
+   */
+  protected static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
+      throws KeeperException {
+    List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
+    if (children == null) {
+      return Collections.emptyList();
+    }
+    List<ServerName> addresses = new ArrayList<ServerName>(children.size());
+    for (String child : children) {
+      addresses.add(ServerName.parseServerName(child));
+    }
+    return addresses;
+  }
+
+  /**
+   * Get a list of all the addresses of all the region servers
+   * for this peer cluster
+   * @return list of addresses
+   * @throws KeeperException
+   */
+  // Synchronize peer cluster connection attempts to avoid races and rate
+  // limit connections when multiple replication sources try to connect to
+  // the peer cluster. If the peer cluster is down we can get out of control
+  // over time.
+  public synchronized List<ServerName> getRegionServers() {
+    try {
+      setRegionServers(fetchSlavesAddresses(this.getZkw()));
+    } catch (KeeperException ke) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Fetch salves addresses failed.", ke);
+      }
+      reconnect(ke);
+    }
+    return regionServers;
+  }
+
+  /**
+   * Set the list of region servers for that peer
+   * @param regionServers list of addresses for the region servers
+   */
+  public void setRegionServers(List<ServerName> regionServers) {
+    this.regionServers = regionServers;
+    lastRegionServerUpdate = System.currentTimeMillis();
+  }
+
+  /**
+   * Get the timestamp at which the last change occurred to the list of region servers to replicate
+   * to.
+   * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
+   */
+  public long getLastRegionServerUpdate() {
+    return lastRegionServerUpdate;
+  }
+
+  /**
+   * Tracks changes to the list of region servers in a peer's cluster.
+   */
+  public static class PeerRegionServerListener extends ZooKeeperListener {
+
+    private final HBaseReplicationEndpoint replicationEndpoint;
+    private final String regionServerListNode;
+
+    public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) {
+      super(replicationPeer.getZkw());
+      this.replicationEndpoint = replicationPeer;
+      this.regionServerListNode = replicationEndpoint.getZkw().rsZNode;
+    }
+
+    @Override
+    public synchronized void nodeChildrenChanged(String path) {
+      if (path.equals(regionServerListNode)) {
+        try {
+          LOG.info("Detected change to peer region servers, fetching updated list");
+          replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw()));
+        } catch (KeeperException e) {
+          LOG.fatal("Error reading slave addresses", e);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
new file mode 100644
index 0000000..90e3460
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+
+import com.google.common.util.concurrent.Service;
+
+/**
+ * ReplicationEndpoint is a plugin which implements replication
+ * to other HBase clusters, or other systems. ReplicationEndpoint implementation
+ * can be specified at the peer creation time by specifying it
+ * in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread
+ * in each region server in the same process.
+ * <p>
+ * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer
+ * relation. ReplicationSource is an HBase-private class which tails the logs and manages
+ * the queue of logs plus management and persistence of all the state for replication.
+ * ReplicationEndpoint on the other hand is responsible for doing the actual shipping
+ * and persisting of the WAL entries in the other cluster.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationEndpoint extends Service {
+
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+  class Context {
+    private final Configuration conf;
+    private final FileSystem fs;
+    private final ReplicationPeerConfig peerConfig;
+    private final ReplicationPeer replicationPeer;
+    private final String peerId;
+    private final UUID clusterId;
+    private final MetricsSource metrics;
+
+    @InterfaceAudience.Private
+    public Context(
+        final Configuration conf,
+        final FileSystem fs,
+        final ReplicationPeerConfig peerConfig,
+        final String peerId,
+        final UUID clusterId,
+        final ReplicationPeer replicationPeer,
+        final MetricsSource metrics) {
+      this.peerConfig = peerConfig;
+      this.conf = conf;
+      this.fs = fs;
+      this.clusterId = clusterId;
+      this.peerId = peerId;
+      this.replicationPeer = replicationPeer;
+      this.metrics = metrics;
+    }
+    public Configuration getConfiguration() {
+      return conf;
+    }
+    public FileSystem getFilesystem() {
+      return fs;
+    }
+    public UUID getClusterId() {
+      return clusterId;
+    }
+    public String getPeerId() {
+      return peerId;
+    }
+    public ReplicationPeerConfig getPeerConfig() {
+      return peerConfig;
+    }
+    public ReplicationPeer getReplicationPeer() {
+      return replicationPeer;
+    }
+    public MetricsSource getMetrics() {
+      return metrics;
+    }
+  }
+
+  /**
+   * Initialize the replication endpoint with the given context.
+   * @param context replication context
+   * @throws IOException
+   */
+  void init(Context context) throws IOException;
+
+  /** Whether or not, the replication endpoint can replicate to it's source cluster with the same
+   * UUID */
+  boolean canReplicateToSameCluster();
+
+  /**
+   * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted
+   * associated UUID. If the replication is not performed to an actual HBase cluster (but
+   * some other system), the UUID returned has to uniquely identify the connected target system.
+   * @return a UUID or null if the peer cluster does not exist or is not connected.
+   */
+  UUID getPeerUUID();
+
+  /**
+   * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication
+   * infrastructure will call this filter before sending the edits to shipEdits().
+   * @return a {@link WALEntryFilter} or null.
+   */
+  WALEntryFilter getWALEntryfilter();
+
+  /**
+   * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method.
+   */
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+  class ReplicateContext {
+    List<HLog.Entry> entries;
+    int size;
+    @InterfaceAudience.Private
+    public ReplicateContext() {
+    }
+
+    public ReplicateContext setEntries(List<HLog.Entry> entries) {
+      this.entries = entries;
+      return this;
+    }
+    public ReplicateContext setSize(int size) {
+      this.size = size;
+      return this;
+    }
+    public List<HLog.Entry> getEntries() {
+      return entries;
+    }
+    public int getSize() {
+      return size;
+    }
+  }
+
+  /**
+   * Replicate the given set of entries (in the context) to the other cluster.
+   * Can block until all the given entries are replicated. Upon this method is returned,
+   * all entries that were passed in the context are assumed to be persisted in the
+   * target cluster.
+   * @param replicateContext a context where WAL entries and other
+   * parameters can be obtained.
+   */
+  boolean replicate(ReplicateContext replicateContext);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
new file mode 100644
index 0000000..82d976d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+
+/**
+ * Keeps KVs that are scoped other than local
+ */
+@InterfaceAudience.Private
+public class ScopeWALEntryFilter implements WALEntryFilter {
+
+  @Override
+  public Entry filter(Entry entry) {
+    NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
+    if (scopes == null || scopes.isEmpty()) {
+      return null;
+    }
+    ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
+    int size = kvs.size();
+    for (int i = size - 1; i >= 0; i--) {
+      KeyValue kv = kvs.get(i);
+      // The scope will be null or empty if
+      // there's nothing to replicate in that WALEdit
+      if (!scopes.containsKey(kv.getFamily())
+          || scopes.get(kv.getFamily()) == HConstants.REPLICATION_SCOPE_LOCAL) {
+        kvs.remove(i);
+      }
+    }
+    if (kvs.size() < size / 2) {
+      kvs.trimToSize();
+    }
+    return entry;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
new file mode 100644
index 0000000..e84ac18
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+
+/**
+ * Skips WAL edits for all System tables including META
+ */
+@InterfaceAudience.Private
+public class SystemTableWALEntryFilter implements WALEntryFilter {
+  @Override
+  public Entry filter(Entry entry) {
+    if (entry.getKey().getTablename().isSystemTable()) {
+      return null;
+    }
+    return entry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
new file mode 100644
index 0000000..b7dfc4e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TableCfWALEntryFilter implements WALEntryFilter {
+
+  private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
+  private final ReplicationPeer peer;
+
+  public TableCfWALEntryFilter(ReplicationPeer peer) {
+    this.peer = peer;
+  }
+
+  @Override
+  public Entry filter(Entry entry) {
+    String tabName = entry.getKey().getTablename().getNameAsString();
+    ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
+    Map<String, List<String>> tableCFs = null;
+
+    try {
+      tableCFs = this.peer.getTableCFs();
+    } catch (IllegalArgumentException e) {
+      LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
+          ", degenerate as if it's not configured by keeping tableCFs==null");
+    }
+    int size = kvs.size();
+
+    // return null(prevent replicating) if logKey's table isn't in this peer's
+    // replicable table list (empty tableCFs means all table are replicable)
+    if (tableCFs != null && !tableCFs.containsKey(tabName)) {
+      return null;
+    } else {
+      List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
+      for (int i = size - 1; i >= 0; i--) {
+        KeyValue kv = kvs.get(i);
+        // ignore(remove) kv if its cf isn't in the replicable cf list
+        // (empty cfs means all cfs of this table are replicable)
+        if ((cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
+          kvs.remove(i);
+        }
+      }
+    }
+    if (kvs.size() < size/2) {
+      kvs.trimToSize();
+    }
+    return entry;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
new file mode 100644
index 0000000..c054978
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+/**
+ * A Filter for WAL entries before being sent over to replication. Multiple
+ * filters might be chained together using {@link ChainWALEntryFilter}.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface WALEntryFilter {
+
+  /**
+   * Applies the filter, possibly returning a different HLog.Entry instance.
+   * If null is returned, the entry will be skipped.
+   * @param entry WAL Entry to filter
+   * @return a (possibly modified) HLog.Entry to use. Returning null or an entry with
+   * no cells will cause the entry to be skipped for replication.
+   */
+  public HLog.Entry filter(HLog.Entry entry);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25be1468/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 9667c43..8f099d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -26,14 +26,11 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
@@ -58,12 +55,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
 
   @Override
   public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
-   // all members of this class are null if replication is disabled, 
+   // all members of this class are null if replication is disabled,
    // so we cannot filter the files
     if (this.getConf() == null) {
       return files;
     }
-    
+
     final Set<String> hlogs = loadHLogsFromQueues();
     return Iterables.filter(files, new Predicate<FileStatus>() {
       @Override
@@ -138,8 +135,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
       LOG.info("Stopping " + this.zkw);
       this.zkw.close();
     }
-    // Not sure why we're deleting a connection that we never acquired or used
-    HConnectionManager.deleteConnection(this.getConf());
   }
 
   @Override


Mime
View raw message