hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject svn commit: r907011 - in /hadoop/hbase/trunk: ./ src/contrib/mdc_replication/ src/contrib/mdc_replication/bin/ src/contrib/mdc_replication/src/ src/contrib/mdc_replication/src/java/ src/contrib/mdc_replication/src/java/org/ src/contrib/mdc_replication/...
Date Fri, 05 Feb 2010 17:37:55 GMT
Author: jdcryans
Date: Fri Feb  5 17:37:54 2010
New Revision: 907011

URL: http://svn.apache.org/viewvc?rev=907011&view=rev
Log:
HBASE-2129  Simple Master/Slave replication

Added:
    hadoop/hbase/trunk/src/contrib/mdc_replication/
    hadoop/hbase/trunk/src/contrib/mdc_replication/bin/
    hadoop/hbase/trunk/src/contrib/mdc_replication/bin/add_peer.rb
    hadoop/hbase/trunk/src/contrib/mdc_replication/bin/copy_tables_desc.rb
    hadoop/hbase/trunk/src/contrib/mdc_replication/build.xml
    hadoop/hbase/trunk/src/contrib/mdc_replication/ivy.xml
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/
    hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=907011&r1=907010&r2=907011&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Feb  5 17:37:54 2010
@@ -362,6 +362,7 @@
                hfiles direct) uploader
    HBASE-1433  Update hbase build to match core, use ivy, publish jars to maven
                repo, etc. (Kay Kay via Stack)
+   HBASE-2129  Simple Master/Slave replication
 
   OPTIMIZATIONS
    HBASE-410   [testing] Speed up the test suite

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/bin/add_peer.rb
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/bin/add_peer.rb?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/bin/add_peer.rb (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/bin/add_peer.rb Fri Feb  5 17:37:54 2010
@@ -0,0 +1,63 @@
+# Script to add a peer to a cluster
+# To see usage for this script, run: 
+#
+#  ${HBASE_HOME}/bin/hbase org.jruby.Main add_peer.rb
+#
+
+include Java
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.EmptyWatcher
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
+
+# Name of this script
+NAME = "add_peer"
+
+# Print usage for this script
+def usage
+  puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
+  exit!
+end
+
+if ARGV.size != 2
+  usage
+end
+
+LOG = LogFactory.getLog(NAME)
+
+parts1 = ARGV[0].split(":")
+LOG.info("Master cluster located at " + parts1[0] + " port " + parts1[1] + " in folder " + parts1[2])
+
+c2 = HBaseConfiguration.create()
+parts2 = ARGV[1].split(":")
+LOG.info("Slave cluster located at " + parts2[0] + " port " + parts2[1] + " in folder " + parts2[2])
+
+LOG.info("The addresses must be exactly the same as those in hbase-site.xml of each cluster.")
+print "Are those info correct? [Y/n] "
+answer = $stdin.gets.chomp
+
+if answer.length != 0 || answer == "n" || answer == "no"
+  exit!
+end
+
+c1 = HBaseConfiguration.create()
+c1.set(HConstants.ZOOKEEPER_QUORUM, parts1[0])
+c1.set("hbase.zookeeper.property.clientPort", parts1[1]) 
+c1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts1[2])
+
+zkw1 = ZooKeeperWrapper.new(c1, EmptyWatcher.instance)
+zkw1.writeZNode(parts1[2], "replication", "a")
+zkw1.writeZNode(parts1[2] + "/replication", "master", ARGV[0]);
+zkw1.writeZNode(parts1[2] + "/replication", "state", "true");
+zkw1.writeZNode(parts1[2] + "/replication/peers", "test", ARGV[1]);
+
+
+c2.set(HConstants.ZOOKEEPER_QUORUM, parts2[0])
+c2.set("hbase.zookeeper.property.clientPort", parts2[1]) 
+c2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts2[2])
+
+zkw2 = ZooKeeperWrapper.new(c2, EmptyWatcher.instance)
+zkw2.writeZNode(parts2[2], "replication", "a")
+zkw2.writeZNode(parts2[2] + "/replication", "master", ARGV[0]);

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/bin/copy_tables_desc.rb
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/bin/copy_tables_desc.rb?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/bin/copy_tables_desc.rb (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/bin/copy_tables_desc.rb Fri Feb  5 17:37:54 2010
@@ -0,0 +1,61 @@
+# Script to recreate all tables from one cluster to another
+# To see usage for this script, run: 
+#
+#  ${HBASE_HOME}/bin/hbase org.jruby.Main copy_tables_desc.rb
+#
+
+include Java
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.EmptyWatcher
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import org.apache.hadoop.hbase.HTableDescriptor
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
+
+# Name of this script
+NAME = "copy_tables_desc"
+
+# Print usage for this script
+def usage
+  puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
+  exit!
+end
+
+if ARGV.size != 2
+  usage
+end
+
+LOG = LogFactory.getLog(NAME)
+
+parts1 = ARGV[0].split(":")
+LOG.info("Master cluster located at " + parts1[0] + " port " + parts1[1] + " in folder " + parts1[2])
+
+parts2 = ARGV[1].split(":")
+LOG.info("Slave cluster located at " + parts2[0] + " port " + parts2[1] + " in folder " + parts2[2])
+
+print "Are those info correct? [Y/n] "
+answer = $stdin.gets.chomp
+
+if answer.length != 0 || answer == "n" || answer == "no"
+  exit!
+end
+
+c1 = HBaseConfiguration.create()
+c1.set(HConstants.ZOOKEEPER_QUORUM, parts1[0])
+c1.set("hbase.zookeeper.property.clientPort", parts1[1]) 
+c1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts1[2])
+
+admin1 = HBaseAdmin.new(c1)
+
+c2 = HBaseConfiguration.create()
+c2.set(HConstants.ZOOKEEPER_QUORUM, parts2[0])
+c2.set("hbase.zookeeper.property.clientPort", parts2[1]) 
+c2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts2[2])
+
+admin2 = HBaseAdmin.new(c2)
+
+for t in admin1.listTables()
+  admin2.createTable(t)
+end

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/build.xml?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/build.xml (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/build.xml Fri Feb  5 17:37:54 2010
@@ -0,0 +1,52 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!-- 
+Before you can run these subtargets directly, you need 
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="mdc_replication" default="jar">
+  <property name="hbase.root" location="../../.." />
+  <import file="../build-contrib.xml"/>
+
+  <path id="classpath">
+    <path refid="contrib.classpath"/>
+  </path>
+
+  <path id="test.classpath">
+    <path refid="test.contrib.classpath"/>
+    <fileset dir="${hbase.root}/lib">
+      <include name="zookeeper*.jar" />
+    </fileset>
+  </path>
+
+  <target name="package" depends="jar, jar-examples" unless="skip.contrib"> 
+    <mkdir dir="${dist.dir}/contrib/${name}"/>
+    <copy todir="${dist.dir}/contrib/${name}" includeEmptyDirs="false" flatten="true">
+      <fileset dir="${build.dir}">
+        <include name="hbase-${version}-${name}.jar" />
+      </fileset>
+      <fileset dir="${basedir}/bin">
+        <include name="add_peer.rb" />
+      </fileset>
+
+    </copy>
+  </target>
+
+</project>

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/ivy.xml?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/ivy.xml (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/ivy.xml Fri Feb  5 17:37:54 2010
@@ -0,0 +1,107 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<ivy-module version="1.0">
+  <info organisation="org.apache.hadoop" module="${ant.project.name}" revision="${version}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
+    <description>
+        Hadoop Core
+    </description>
+  </info>
+  <configurations defaultconfmapping="default">
+    <!--these match the Maven configurations-->
+    <conf name="default" extends="master,runtime"/>
+    <conf name="master" description="contains the artifact but no dependencies"/>
+    <conf name="runtime" description="runtime but not the artifact"
+      extends="client,server,s3-server,kfs,mandatory,jetty,ftp"/>
+
+    <conf name="mandatory" description="contains the critical  dependencies"
+      extends="commons-logging,log4j"/>
+
+    <!--
+    These public configurations contain the core dependencies for running hadoop client or server.
+    The server is effectively a superset of the client.
+    -->
+    <conf name="client" description="client-side dependencies"
+      extends="mandatory,httpclient"/>
+    <conf name="server" description="server-side dependencies"
+      extends="client"/>
+    <conf name="s3-client" description="dependencies for working with S3/EC2 infrastructure"
+      extends="client"/>
+    <conf name="s3-server" description="dependencies for running on S3/EC2 infrastructure"
+      extends="s3-client,server"/>
+    <conf name="kfs" description="dependencies for KFS file system support"/>
+    <conf name="ftp" description="dependencies for workign with FTP filesytems" 
+              extends="mandatory"/>
+   <conf name="jetty" description="Jetty provides the in-VM HTTP daemon" extends="commons-logging"/>
+
+   <conf name="common" extends="runtime,mandatory,httpclient,ftp,jetty"
+          description="common artifacts"/>
+    <!--Testing pulls in everything-->
+   <conf name="test" extends="master,common" description="the classpath needed to run tests"/>
+
+    <!--Private configurations. -->
+
+    <conf name="javadoc" visibility="private" description="artifacts required while performing doc generation"
+      extends="common,mandatory,jetty,lucene"/>
+
+    <conf name="releaseaudit" visibility="private"
+  description="Artifacts required for releaseaudit target"/>
+     
+    <conf name="commons-logging" visibility="private"/>
+    <conf name="httpclient" visibility="private" extends="commons-logging"/>
+    <conf name="log4j" visibility="private"/>
+    <conf name="lucene" visibility="private"/>
+    <conf name="jdiff" visibility="private" extends="log4j,s3-client,jetty,server"/>
+    <conf name="checkstyle" visibility="private"/>
+
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+  <dependencies>
+   <!--  Common -->
+    <dependency org="org.apache.hadoop" name="hadoop-core" 
+               rev="${hadoop-core.version}" conf="common->default" changing="true" >
+      <exclude conf="test"/>  
+    </dependency>             
+    <dependency org="org.apache.hadoop" name="hadoop-hdfs" 
+               rev="${hadoop-hdfs.version}" conf="common->default" changing="true" >
+      <exclude conf="test"/>  
+    </dependency>
+    <dependency org="org.jruby" name="jruby-complete"
+              rev="${jruby.version}" conf="common->default" />
+    
+   <!--  Test  -->
+   <!-- 
+   Test Zookeeper cluster
+    <dependency org="org.apache.hadoop" name="hadoop-mapred-test" 
+               rev="${hadoop-mapred.version}" conf="test->default"/> 
+   -->                
+    <dependency org="org.apache.hadoop" name="hadoop-core-test" 
+               rev="${hadoop-core.version}" conf="test->default"  transitive="false" changing="true" />
+    <dependency org="org.apache.hadoop" name="hadoop-hdfs-test" 
+               rev="${hadoop-hdfs.version}" conf="test->default" transitive="false" changing="true"/>
+    <dependency org="log4j" name="log4j" 
+               rev="${log4j.version}" conf="test->master">
+      <exclude conf="jmx,mail,jms"/>  
+    </dependency>                
+  </dependencies>
+</ivy-module>

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java Fri Feb  5 17:37:54 2010
@@ -0,0 +1,42 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+/**
+ * Helper class to add RPC-related configs for replication
+ */
+public class ReplicationRPC {
+
+  private static final byte RPC_CODE = 110;
+
+  private static boolean initialized = false;
+
+  public synchronized static void initialize() {
+    if (initialized) {
+      return;
+    }
+    HBaseRPC.addToMap(ReplicationRegionInterface.class, RPC_CODE);
+    initialized = true;
+  }
+
+  private ReplicationRPC() {
+    // Static helper class;
+  }
+}

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java Fri Feb  5 17:37:54 2010
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+import java.io.IOException;
+
+/**
+ * Interface that defines replication
+ */
+public interface ReplicationRegionInterface extends HRegionInterface {
+
+  /**
+   * Replicates the given entries. The guarantee is that the given entries
+   * will be durable on the slave cluster if this method returns without
+   * and exception.
+   * @param entries entries to replicate
+   * @throws IOException
+   */
+  public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
+
+}

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html Fri Feb  5 17:37:54 2010
@@ -0,0 +1,136 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+
+<!--
+   Copyright 2010 The Apache Software Foundation
+
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<head />
+<body bgcolor="white">
+<h1>Multi Data Center Replication</h1>
+This package provides replication between HBase clusters.
+<p>
+
+<h2>Table Of Contents</h2>
+<ol>
+    <li><a href="#status">Status</a></li>
+    <li><a href="#requirements">Requirements</a></li>
+    <li><a href="#deployment">Deployment</a></li>
+</ol>
+
+<p>
+<a name="status">
+<h2>Status</h2>
+</a>
+<p>
+This package isn't even alpha quality software and is only meant to be a base
+for future developments. The current implementation offers the following
+features:
+
+<ol>
+    <li>Master/Slave replication limited to 1 slave. </li>
+    <li>Replication of all user tables.</li>
+    <li>Start/stop replication stream.</li>
+    <li>Supports cluters of different sizes.</li>
+    <li>Re-replication of entries from failed region
+        servers on the master cluster.</li>
+</ol>
+Please report bugs on the project's Jira when found.
+<p>
+<a name="requirements">
+<h2>Requirements</h2>
+</a>
+<p>
+
+Before trying out replication, make sure to review the following requirements:
+
+<ol>
+    <li>Zookeeper should be handled by yourself, not by HBase, and should
+    always be available during the deployment. Currently you can't use
+    zoo.cfg to hold your Zookeeper configurations.</li>
+    <li>All machines from both clusters should be able to reach every
+    other machine since replication goes from any region server to any
+    other one on the slave cluster. That also includes the
+    Zookeeper clusters.</li>
+    <li>Both clusters should have the same HBase and Hadoop major revision.
+    For example, having 0.21.1 on the master and 0.21.0 on the slave is
+    correct but not 0.21.1 and 0.22.0.</li>
+    <li>Every table should exist with the exact same name and column
+    family names on both clusters.</li>
+</ol>
+
+<p>
+<a name="deployment">
+<h2>Deployment</h2>
+</a>
+<p>
+
+The following steps describe how to enable replication from a cluster
+to another. This must be done with both clusters offlined.
+<ol>
+    <li>Copy the hbase-0.21.0-dev-mdc_replication.jar file from the
+    $HBASE_HOME/contrib/mdc_replication/ folder to $HBASE_HOME/lib on
+    both clusters.</li>
+    <li>Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
+    the following configurations:
+        <pre>
+&lt;property&gt;
+  &lt;name&gt;hbase.regionserver.class&lt;/name&gt;
+  &lt;value&gt;org.apache.hadoop.hbase.ipc.ReplicationRegionInterface&lt;/value&gt;
+&lt;/property&gt;
+&lt;property&gt;
+  &lt;name&gt;hbase.regionserver.impl&lt;/name&gt;
+  &lt;value&gt;org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer&lt;/value&gt;
+&lt;/property&gt;</pre>
+    </li>
+    <li>Run the following command on any cluster:
+    <pre>
+$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/src/contrib/mdc_replication/bin/add_peer.tb</pre>
+    This will show you the help to setup the replication stream between
+    both clusters. If both clusters use the same Zookeeper cluster, you have
+    to use a different <b>zookeeper.znode.parent</b> since they can't
+    write in the same folder.
+    </li>
+    <li>You can now start and stop the clusters with your preferred method.</li>
+</ol>
+
+You can confirm that your setup works by looking at any region server's log
+on the master cluster and look for the following lines;
+
+<pre>
+Considering 1 rs, with ratio 0.1
+Getting 1 rs from peer cluster # 0
+Choosing peer 10.10.1.49:62020</pre>
+
+In this case it indicates that 1 region server from the slave cluster
+was chosen for replication.<br><br>
+
+Should you want to stop the replication while the clusters are running, open
+the shell on the master cluster and issue this command:
+<pre>
+hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'</pre>
+
+Where you replace the znode parent with the one configured on your master
+cluster. Replication of already queued edits will still happen after you
+issued that command but new entries won't be. To start it back, simply replace
+"false" with "true" in the command.
+
+<p>
+
+</body>
+</html>

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java Fri Feb  5 17:37:54 2010
@@ -0,0 +1,103 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+/**
+ * Specialized version of HRegion to handle replication. In particular,
+ * it replays all edits from the reconstruction log.
+ */
+public class ReplicationRegion extends HRegion {
+
+  static final Log LOG = LogFactory.getLog(ReplicationRegion.class);
+
+  private final ReplicationSource replicationSource;
+
+  public ReplicationRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
+      HRegionInfo regionInfo, FlushRequester flushListener,
+      ReplicationSource repSource) {
+    super(basedir, log, fs, conf, regionInfo, flushListener);
+    this.replicationSource = repSource;
+  }
+
+
+  protected void doReconstructionLog(final Path oldLogFile,
+      final long minSeqId, final long maxSeqId, final Progressable reporter)
+      throws UnsupportedEncodingException, IOException {
+    super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
+
+    if(this.replicationSource == null) {
+      return;
+    }
+
+    if (oldLogFile == null || !getFilesystem().exists(oldLogFile)) {
+      return;
+    }
+
+    FileStatus[] stats = getFilesystem().listStatus(oldLogFile);
+    if (stats == null || stats.length == 0) {
+      LOG.warn("Passed reconstruction log " + oldLogFile
+          + " is zero-length");
+    }
+
+    HLog.Reader reader = HLog.getReader(getFilesystem(), oldLogFile, getConf());
+    try {
+      HLog.Entry entry;
+      while ((entry = reader.next()) != null) {
+        HLogKey key = entry.getKey();
+        KeyValue val = entry.getEdit();
+        if (key.getLogSeqNum() < maxSeqId) {
+          continue;
+        }
+
+        // Don't replicate catalog entries and meta information like
+        // complete log flush.
+        if(!(Bytes.equals(key.getTablename(),ROOT_TABLE_NAME) ||
+            Bytes.equals(key.getTablename(),META_TABLE_NAME)) &&
+            !Bytes.equals(val.getFamily(), HLog.METAFAMILY)) {
+          this.replicationSource.enqueueLog(entry);
+        }
+
+      }
+    } finally {
+      reader.close();
+    }
+
+    
+  }
+}

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java Fri Feb  5 17:37:54 2010
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.ReplicationRPC;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.replication.ReplicationHLog;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperHelper;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ReplicationRegionServer extends HRegionServer
+    implements ReplicationRegionInterface {
+
+  static {
+    ReplicationRPC.initialize();
+  }
+
+  protected static final Log LOG =
+      LogFactory.getLog(ReplicationRegionServer.class);
+
+  private final ReplicationSource replicationSource;
+  private ReplicationSink replicationSink;
+  private final boolean isMaster;
+  private final AtomicBoolean isReplicating = new AtomicBoolean(true);
+
+  private final ReplicationZookeeperHelper zkHelper;
+
+
+  /**
+   * Starts a HRegionServer at the default location
+   *
+   * @param conf
+   * @throws java.io.IOException
+   */
+  public ReplicationRegionServer(Configuration conf) throws IOException {
+    super(conf);
+
+    this.zkHelper = new ReplicationZookeeperHelper(
+        this.getZooKeeperWrapper(), this.conf, this.isReplicating);
+    this.isMaster = zkHelper.isMaster();
+
+    this.replicationSink = null;
+    this.replicationSource = this.isMaster ? new ReplicationSource(this,
+        super.stopRequested, this.isReplicating) : null;
+  }
+
+  @Override
+  protected HLog instantiateHLog(Path logdir) throws IOException {
+    HLog newlog = new ReplicationHLog(super.getFileSystem(),
+        logdir, conf, super.getLogRoller(),
+        this.replicationSource);
+    return newlog;
+  }
+
+  @Override
+  protected void init(final MapWritable c) throws IOException {
+    super.init(c);
+    String n = Thread.currentThread().getName();
+
+    String repLogPathStr =
+      ReplicationSink.getRepLogPath(getHServerInfo().getServerName());
+    Path repLogPath = new Path(getRootDir(), repLogPathStr);
+
+
+    Thread.UncaughtExceptionHandler handler =
+        new Thread.UncaughtExceptionHandler() {
+      public void uncaughtException(final Thread t, final Throwable e) {
+        abort();
+        LOG.fatal("Set stop flag in " + t.getName(), e);
+      }
+    };
+    if(this.isMaster) {
+      Threads.setDaemonThreadRunning(
+          this.replicationSource, n + ".replicationSource", handler);
+    } else {
+      this.replicationSink =
+        new ReplicationSink(conf,super.stopRequested,
+            repLogPath, getFileSystem(), getThreadWakeFrequency());
+      Threads.setDaemonThreadRunning(
+          this.replicationSink, n + ".replicationSink", handler);
+    }
+  }
+
+  @Override
+  protected HRegion instantiateRegion(final HRegionInfo regionInfo)
+      throws IOException {
+    HRegion r = new ReplicationRegion(HTableDescriptor.getTableDir(super
+        .getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
+        .getFileSystem(), super.conf, regionInfo,
+        super.getFlushRequester(), this.replicationSource);
+
+    r.initialize(null, new Progressable() {
+      public void progress() {
+        addProcessingMessage(regionInfo);
+      }
+    });
+    return r;
+  }
+
+
+  @Override
+  public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+    this.replicationSink.replicateEntries(entries);
+  }
+
+  /**
+   *
+   * @param protocol
+   * @param clientVersion
+   * @return
+   * @throws IOException
+   */
+  public long getProtocolVersion(final String protocol,
+      final long clientVersion)
+  throws IOException {
+    if (protocol.equals(ReplicationRegionInterface.class.getName())) {
+      return HBaseRPCProtocolVersion.versionID;
+    }
+    throw new IOException("Unknown protocol to name node: " + protocol);
+  }
+
+  /**
+   *
+   * @return
+   */
+  public ReplicationZookeeperHelper getZkHelper() {
+    return zkHelper;
+  }
+
+  protected void join() {
+    super.join();
+    if(this.isMaster) {
+      Threads.shutdown(this.replicationSource);
+    } else {
+      Threads.shutdown(this.replicationSink);
+    }
+  }
+}

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java Fri Feb  5 17:37:54 2010
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This class is responsible for replicating the edits coming
+ * from another cluster. All edits are first put into a log that will
+ * be read later by the main thread.
+ *
+ * This replication process is currently waiting for the edits to be applied
+ * before any other entry can be appended to the log.
+ *
+ * The log is rolled but old ones aren't kept at the moment.
+ */
+public class ReplicationSink extends Thread {
+
+  public static final String REPLICATION_LOG_DIR = ".replogs";
+
+  static final Log LOG = LogFactory.getLog(ReplicationSink.class);
+  private final Configuration conf;
+
+  private final HTablePool pool;
+
+  private final AtomicBoolean stop;
+
+  private HLog.Reader reader;
+
+  private HLog.Writer writer;
+
+  private final FileSystem fs;
+
+  private Path path;
+
+  private long position = 0;
+
+  private final Lock lock = new ReentrantLock();
+
+  private final Condition newData  = lock.newCondition();
+
+  private final AtomicLong editsSize = new AtomicLong(0);
+
+  private long lastEditSize = 0;
+
+  private final long logrollsize;
+
+  private final long threadWakeFrequency;
+
+  /**
+   * Create a sink for replication
+   * @param conf conf object
+   * @param stopper boolean to tell this thread to stop
+   * @param path the path to the log
+   * @param fs the filesystem to use
+   * @param threadWakeFrequency how long should the thread wait for edits
+   * @throws IOException thrown when HDFS goes bad or bad file name
+   */
+  public ReplicationSink(final Configuration conf,
+                         final AtomicBoolean stopper, Path path,
+                         FileSystem fs, long threadWakeFrequency)
+                         throws IOException {
+    this.conf = conf;
+    this.pool = new HTablePool(this.conf, 10);
+    this.stop = stopper;
+    this.fs = fs;
+    this.path = path;
+    long blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
+      this.fs.getDefaultBlockSize());
+    float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
+    this.logrollsize = (long)(blocksize * multi);
+    this.threadWakeFrequency = threadWakeFrequency;
+    rollLog();
+
+  }
+
+  /**
+   * Put this array of entries into a log that will be read later
+   * @param entries
+   * @throws IOException
+   */
+  public void replicateEntries(HLog.Entry[] entries)
+      throws IOException {
+    try {
+      this.lock.lock();
+      if(!this.stop.get()) {
+        // add to WAL and defer actual inserts
+        try {
+          for(HLog.Entry entry : entries) {
+
+            this.writer.append(entry);
+            this.editsSize.addAndGet(entry.getKey().heapSize() +
+                entry.getEdit().heapSize());
+          }
+          this.writer.sync();
+          this.newData.signal();
+
+        } catch (IOException ioe) {
+          LOG.error("Unable to accept edit because", ioe);
+          throw ioe;
+        }
+      } else {
+        LOG.info("Won't be replicating data as we are shutting down");
+      }
+    } finally {
+      this.lock.unlock();
+
+    }
+  }
+
+  public void run() {
+
+    try {
+      HTableInterface table = null;
+      this.lock.lock();
+      while (!this.stop.get()) {
+        this.newData.await(this.threadWakeFrequency, TimeUnit.MILLISECONDS);
+        try {
+          if(this.lastEditSize == this.editsSize.get()) {
+            continue;
+          }
+          // There's no tailing in HDFS so we create a new reader
+          // and seek every time
+          this.reader = HLog.getReader(this.fs, this.path, this.conf);
+
+          if (position != 0) {
+            this.reader.seek(position);
+          }
+
+          byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
+          List<Put> puts = new ArrayList<Put>();
+
+          // Very simple optimization where we batch sequences of rows going
+          // to the same table.
+          HLog.Entry entry = new HLog.Entry();
+          while (this.reader.next(entry) != null) {
+            KeyValue kv = entry.getEdit();
+
+            if (kv.isDelete()) {
+              Delete delete = new Delete(kv.getRow(), kv.getTimestamp(), null);
+              if (kv.isDeleteFamily()) {
+                delete.deleteFamily(kv.getFamily());
+              } else if (!kv.isEmptyColumn()) {
+                delete.deleteColumn(entry.getEdit().getFamily(),
+                    kv.getQualifier());
+              }
+              table = pool.getTable(entry.getKey().getTablename());
+              table.delete(delete);
+              pool.putTable(table);
+
+            } else {
+              Put put = new Put(kv.getRow(), kv.getTimestamp(), null);
+              put.add(entry.getEdit().getFamily(),
+                  kv.getQualifier(), kv.getValue());
+              // Switching table, flush
+              if (!Bytes.equals(lastTable, entry.getKey().getTablename())
+                  && !puts.isEmpty()) {
+                table = pool.getTable(lastTable);
+                table.put(puts);
+                pool.putTable(table);
+                puts.clear();
+              }
+              lastTable = entry.getKey().getTablename();
+              puts.add(put);
+            }
+          }
+
+          if (!puts.isEmpty()) {
+            table = pool.getTable(lastTable);
+            table.put(puts);
+            pool.putTable(table);
+          }
+
+          position = this.reader.getPosition();
+
+          if(this.editsSize.get() > this.logrollsize) {
+            rollLog();
+          }
+          this.lastEditSize = editsSize.get();
+
+
+        } catch (EOFException eof) {
+          LOG.warn("Got EOF while reading, will continue on next notify");
+        } catch (TableNotFoundException ex) {
+          LOG.warn("Losing edits because: " + ex);
+        } finally {
+          this.newData.signal();
+          if(this.reader != null) {
+            this.reader.close();
+          }
+          this.reader = null;
+        }
+
+      }
+      close();
+    } catch (Exception ex) {
+      // Should we log rejected edits in a file for replay?
+      LOG.error("Unable to accept edit because", ex);
+      this.stop.set(true);
+    } finally {
+      this.lock.unlock();
+    }
+  }
+
+  private void close() throws IOException {
+    this.writer.close();
+    if(reader != null) {
+      this.reader.close();
+    }
+    this.fs.delete(this.path,true);
+  }
+
+  // Delete the current log and start a new one with the same name
+  // TODO keep the old versions so that the writing thread isn't help up
+  // by the reading thead and this latter one could be reading older logs.
+  // At this point we are under the lock.
+  protected void rollLog() throws IOException {
+    if(! (this.editsSize.get() == 0)) {
+      this.writer.close();
+      if(this.reader != null) {
+        this.reader.close();
+      }
+      this.fs.delete(this.path,true);
+    }
+    this.writer = HLog.createWriter(this.fs, this.path, this.conf);
+    this.editsSize.set(0);
+    this.position = 0;
+    LOG.debug("New replication log");
+  }
+
+  /**
+   * Get the path of the file for this server
+   * @param serverName
+   * @return
+   */
+  public static String getRepLogPath(String serverName) {
+    StringBuilder dirName = new StringBuilder(REPLICATION_LOG_DIR);
+    dirName.append("/");
+    dirName.append(serverName);
+    return dirName.toString();
+  }
+}

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java Fri Feb  5 17:37:54 2010
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.ReplicationConnectionManager;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperHelper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Class that handles the source of a replication stream
+ * Currently does not handle more than 1 slave
+ * For each slave cluster it selects a random number of peers
+ * using a replication ratio. For example, if replication ration = 0.1
+ * and slave cluster has 100 region servers, 10 will be selected.
+ */
+public class ReplicationSource extends Chore implements HConstants {
+
+  static final Log LOG = LogFactory.getLog(ReplicationSource.class);
+  private final LinkedBlockingQueue<HLog.Entry> queue =
+      new LinkedBlockingQueue<HLog.Entry>();
+  private final List<HLog.Entry> tempArray = new ArrayList<HLog.Entry>();
+  private final HLog.Entry[] dummyArray = new HLog.Entry[0];
+  private final ReplicationConnectionManager conn;
+  private final ReplicationZookeeperHelper zkHelper;
+  private final Configuration conf;
+  private final float ratio;
+  private final Random random;
+  private final AtomicBoolean isReplicating;
+
+  private List<HServerAddress> currentPeers;
+
+  /**
+   * Constructor used by region servers
+   * @param server the region server specialized in replication
+   * @param stopper the atomic boolean to use to stop the cluster
+   * @param isReplicating the atomic boolean that starts/stops replication
+   * @throws IOException
+   */
+  public ReplicationSource(final ReplicationRegionServer server,
+                           final AtomicBoolean stopper,
+                           final AtomicBoolean isReplicating)
+                           throws IOException {
+    super(server.getThreadWakeFrequency(), stopper);
+    this.conf = server.getConfiguration();
+    this.conn = new ReplicationConnectionManager(this.conf);
+    this.zkHelper = server.getZkHelper();
+    this.ratio = this.conf.getFloat("replication.ratio", 0.1f);
+    currentPeers = new ArrayList<HServerAddress>();
+    this.random = new Random();
+    this.isReplicating = isReplicating;
+  }
+
+  @Override
+   protected boolean initialChore() {
+    this.chooseSinksForPeer(0);
+    return currentPeers.size() > 0;
+  }
+
+  /**
+   * Select a number of peers at random using the ratio. Mininum 1.
+   * @param index
+   */
+  private void chooseSinksForPeer(int index) {
+    this.currentPeers.clear();
+    List<HServerAddress> addresses = this.zkHelper.getPeersAddresses(index);
+    Map<String, HServerAddress> mapOfAdr =
+        new HashMap<String, HServerAddress>();
+    LOG.info("Considering " + addresses.size() +
+        " rs, with ratio " + ratio);
+    int nbPeers = (int)(Math.ceil (addresses.size()*ratio));
+    LOG.info("Getting " + nbPeers + " rs from peer cluster # " + index);
+    for(int i = 0; i < nbPeers; i++) {
+      HServerAddress adr =
+          addresses.get(this.random.nextInt(addresses.size()));
+      while(mapOfAdr.containsKey(adr.toString())) {
+        adr = addresses.get(this.random.nextInt(addresses.size()));
+      }
+      LOG.info("Choosing peer " + adr.toString());
+      mapOfAdr.put(adr.toString(), adr);
+    }
+    this.currentPeers.addAll(mapOfAdr.values());
+  }
+
+  /**
+   * Put a log entry in a replication queue if replication is enabled
+   * @param logEntry
+   */
+  public void enqueueLog(HLog.Entry logEntry) {
+    if(this.isReplicating.get()) {
+      this.queue.add(logEntry);
+    }
+  }
+
+  @Override
+  protected void chore() {
+    while(!super.stop.get()) {
+      // Drain the edits accumulated in the queue, select a node at random
+      // and send the edits. If it fails, get a new set of nodes and chose
+      // a new one to replicate to.
+      try {
+        this.queue.drainTo(this.tempArray);
+        if(this.tempArray.size() > 0) {
+          HServerAddress adr =
+              currentPeers.get(random.nextInt(this.currentPeers.size()));
+          ReplicationRegionInterface rrs = this.conn.getHRegionConnection(adr);
+          LOG.debug("Replicating " + this.tempArray.size()
+              + " to " + adr.toString());
+          rrs.replicateLogEntries(this.tempArray.toArray(dummyArray));
+          this.tempArray.clear();
+        }
+        return;
+      }
+      catch (IOException ioe) {
+        LOG.warn("Unable to replicate, retrying with a new node", ioe);
+
+        try{
+          Thread.sleep(1000);
+        } catch (InterruptedException e){
+          // continue
+        }
+
+        // Should wait in a backoff fashion?
+        // make sure we don't retry with the same node
+        chooseSinksForPeer(0);
+      }
+    }
+  }
+
+ 
+}

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java Fri Feb  5 17:37:54 2010
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.replication.ReplicationSource;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
+
+import java.io.IOException;
+
+/**
+ * HLog specialized in replication. It replicates every entry from every
+ * user table at the moment.
+ */
+public class ReplicationHLog extends HLog {
+
+  static final Log LOG = LogFactory.getLog(ReplicationHLog.class);
+
+  private ReplicationSource replicationSource;
+
+  private final boolean isReplicator;
+
+  /**
+   * New constructor used for replication
+   * @param fs filesystem to use
+   * @param dir directory to store the wal
+   * @param conf conf ot use
+   * @param listener log listener to pass to super class
+   * @param replicationSource where to put the entries
+   * @throws IOException
+   */
+  public ReplicationHLog(final FileSystem fs, final Path dir,
+                         final Configuration conf,
+                         final LogRollListener listener,
+                         ReplicationSource replicationSource)
+                         throws IOException {
+    super(fs, dir, conf, listener);
+    this.replicationSource = replicationSource;
+    this.isReplicator = this.replicationSource != null;
+  }
+
+  @Override
+  protected void doWrite(HRegionInfo info, HLogKey logKey,
+                         KeyValue logEdit, long now)
+      throws IOException {
+    super.doWrite(info, logKey, logEdit, now);
+    if(this.isReplicator && ! (info.isMetaRegion() || info.isRootRegion())) {
+      this.replicationSource.enqueueLog(new Entry(logKey, logEdit));
+    }
+
+  }
+
+  public ReplicationSource getReplicationSource() {
+    return this.replicationSource;
+  }
+
+
+}

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java Fri Feb  5 17:37:54 2010
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.ipc.RemoteException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Connection manager to communicate with the other clusters.
+ */
+public class ReplicationConnectionManager implements HConstants {
+
+  private final int numRetries;
+  private final int maxRPCAttempts;
+  private final long rpcTimeout;
+  private final Map<String, ReplicationRegionInterface> servers =
+      new ConcurrentHashMap<String, ReplicationRegionInterface>();
+  private final
+    Class<? extends ReplicationRegionInterface> serverInterfaceClass;
+  private final Configuration conf;
+
+  /**
+   * Constructor that sets up RPC to other clusters
+   * @param conf
+   */
+  public ReplicationConnectionManager(Configuration conf) {
+    this.conf = conf;
+    String serverClassName =
+        conf.get(REGION_SERVER_CLASS, DEFAULT_REGION_SERVER_CLASS);
+    this.numRetries = conf.getInt("hbase.client.retries.number", 10);
+    this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
+    this.rpcTimeout = conf.getLong("hbase.regionserver.lease.period", 60000);
+    try {
+      this.serverInterfaceClass =
+        (Class<? extends ReplicationRegionInterface>)
+            Class.forName(serverClassName);
+    } catch (ClassNotFoundException e) {
+      throw new UnsupportedOperationException(
+          "Unable to find region server interface " + serverClassName, e);
+    }
+  }
+
+  /**
+   * Get a connection to a distant region server for replication
+   * @param regionServer the address to use
+   * @return the connection to the region server
+   * @throws IOException
+   */
+  public ReplicationRegionInterface getHRegionConnection(
+      HServerAddress regionServer)
+      throws IOException {
+    ReplicationRegionInterface server;
+    synchronized (this.servers) {
+      // See if we already have a connection
+      server = this.servers.get(regionServer.toString());
+      if (server == null) { // Get a connection
+        try {
+          server = (ReplicationRegionInterface) HBaseRPC.waitForProxy(
+              serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
+              regionServer.getInetSocketAddress(), this.conf,
+              this.maxRPCAttempts, this.rpcTimeout);
+        } catch (RemoteException e) {
+          throw RemoteExceptionHandler.decodeRemoteException(e);
+        }
+        this.servers.put(regionServer.toString(), server);
+      }
+    }
+    return server;
+  }
+}

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java Fri Feb  5 17:37:54 2010
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class servers as a helper for all things related to zookeeper
+ * in the replication contrib.
+ */
+public class ReplicationZookeeperHelper implements HConstants, Watcher {
+
+  static final Log LOG = LogFactory.getLog(ReplicationZookeeperHelper.class);
+
+  private final ZooKeeperWrapper zookeeperWrapper;
+
+  private final List<ZooKeeperWrapper> peerClusters;
+
+  private final String replicationZNode;
+  private final String peersZNode;
+
+  private final String replicationStateNodeName;
+
+  private final boolean isMaster;
+
+  private final Configuration conf;
+
+  private final AtomicBoolean isReplicating;
+
+  /**
+   * Constructor used by region servers
+   * @param zookeeperWrapper zkw to wrap
+   * @param conf conf to use
+   * @param isReplicating atomic boolean to start/stop replication
+   * @throws IOException
+   */
+  public ReplicationZookeeperHelper(
+      ZooKeeperWrapper zookeeperWrapper, Configuration conf,
+      final AtomicBoolean isReplicating) throws IOException{
+    this.zookeeperWrapper = zookeeperWrapper;
+    this.conf = conf;
+    String replicationZNodeName =
+        conf.get("zookeeper.znode.replication", "replication");
+    String peersZNodeName =
+        conf.get("zookeeper.znode.peers", "peers");
+    String repMasterZNodeName =
+        conf.get("zookeeper.znode.master", "master");
+    this.replicationStateNodeName =
+        conf.get("zookeeper.znode.state", "state");
+
+
+    this.peerClusters = new ArrayList<ZooKeeperWrapper>();
+    this.replicationZNode = zookeeperWrapper.getZNode(
+        zookeeperWrapper.getParentZNode(),replicationZNodeName);
+    this.peersZNode =
+        zookeeperWrapper.getZNode(replicationZNode,peersZNodeName);
+    
+    List<String> znodes =
+        this.zookeeperWrapper.listZnodes(this.peersZNode, this);
+    if(znodes != null) {
+      for(String znode : znodes) {
+        connectToPeer(znode);
+      }
+    }
+    String address = this.zookeeperWrapper.getData(this.replicationZNode,
+        repMasterZNodeName);
+
+    String thisCluster = this.conf.get(ZOOKEEPER_QUORUM)+":"+
+        this.conf.get("hbase.zookeeper.property.clientPort") +":" +
+        this.conf.get(ZOOKEEPER_ZNODE_PARENT);
+
+    this.isMaster = thisCluster.equals(address);
+    
+    LOG.info("This cluster (" + thisCluster + ") is a "
+        + (this.isMaster ? "master" : "slave") + " for replication" +
+        ", compared with (" + address + ")");
+
+    this.isReplicating = isReplicating;
+
+    setIsReplicating();
+  }
+
+  /**
+   * Returns all region servers from given peer
+   * @param clusterIndex the cluster to interrogate
+   * @return addresses of all region servers
+   */
+  public List<HServerAddress> getPeersAddresses(int clusterIndex) {
+    return this.peerClusters.size() == 0 ?
+        null : this.peerClusters.get(clusterIndex).scanRSDirectory();
+  }
+
+  // This method connects this cluster to another one and registers it
+  private void connectToPeer(String znode) throws IOException {
+    String[] quorum =
+        this.zookeeperWrapper.getData(this.peersZNode, znode).split(":");
+    if(quorum.length == 3) {
+      Configuration otherConf = new Configuration(this.conf);
+      otherConf.set(ZOOKEEPER_QUORUM, quorum[0]);
+      otherConf.set("hbase.zookeeper.property.clientPort", quorum[1]);
+      otherConf.set(ZOOKEEPER_ZNODE_PARENT, quorum[2]);
+      this.peerClusters.add(new ZooKeeperWrapper(otherConf, this));
+      LOG.info("Added new peer cluster " + StringUtils.arrayToString(quorum));
+    } else {
+      LOG.error("Wrong format of cluster address: " +
+          this.zookeeperWrapper.getData(this.peersZNode, znode));
+    }
+  }
+
+  /**
+   * Tells if this cluster replicates or not
+   * @return
+   */
+  public boolean isMaster() {
+    return isMaster;
+  }
+
+  @Override
+  public void process(WatchedEvent watchedEvent) {
+    Event.EventType type = watchedEvent.getType();
+    LOG.info(("Got event " + type + " with path " + watchedEvent.getPath()));
+    if (type.equals(Event.EventType.NodeDataChanged)) {
+      setIsReplicating();
+    }
+  }
+
+  /**
+   * This reads the state znode for replication and sets the atomic boolean
+   */
+  private void setIsReplicating() {
+    String value = this.zookeeperWrapper.getDataAndWatch(
+        this.replicationZNode, this.replicationStateNodeName, this);
+    if(value != null) {
+      isReplicating.set(value.equals("true"));
+      LOG.info("Replication is now " + (isReplicating.get() ?
+          "started" : "stopped"));
+    }
+  }
+}

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java Fri Feb  5 17:37:54 2010
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.fs.Path;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TestReplicationSink {
+
+  protected static final Log LOG =
+      LogFactory.getLog(TestReplicationSink.class);
+
+  private static final int BATCH_SIZE = 10;
+
+  private static final long SLEEP_TIME = 500;
+
+  private final static Configuration conf = HBaseConfiguration.create();
+
+  private final static HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+
+  private static ReplicationSink SINK;
+
+  private static final byte[] TABLE_NAME1 =
+      Bytes.toBytes("table1");
+  private static final byte[] TABLE_NAME2 =
+      Bytes.toBytes("table2");
+
+  private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
+  private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
+
+  private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
+
+  private static HTable table1;
+
+  private static HTable table2;
+
+   /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    TEST_UTIL.startMiniCluster(3);
+    Path repLogPath = new Path(TEST_UTIL.getTestDir(),
+        ReplicationSink.getRepLogPath("test_rep_sink"));
+    SINK = new ReplicationSink(conf,STOPPER,
+        TEST_UTIL.getTestDir(),
+        TEST_UTIL.getDFSCluster().getFileSystem(), 1000);
+    SINK.start();
+    table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
+    table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    STOPPER.set(true);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
+    table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
+    Thread.sleep(SLEEP_TIME);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {}
+
+  @Test
+  public void testBatchSink() throws Exception {
+    HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
+    for(int i = 0; i < BATCH_SIZE; i++) {
+      entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+    }
+    SINK.replicateEntries(entries);
+    Thread.sleep(SLEEP_TIME);
+    Scan scan = new Scan();
+    ResultScanner scanRes = table1.getScanner(scan);
+    assertEquals(scanRes.next(BATCH_SIZE).length, BATCH_SIZE);
+  }
+
+  @Test
+  public void testMixedPutDelete() throws Exception {
+    HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
+    for(int i = 0; i < BATCH_SIZE; i+=2) {
+      entries[i/2] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+    }
+    SINK.replicateEntries(entries);
+    Thread.sleep(SLEEP_TIME);
+
+    entries = new HLog.Entry[BATCH_SIZE];
+    for(int i = 0; i < BATCH_SIZE; i++) {
+      entries[i] = createEntry(TABLE_NAME1, i,
+          i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn);
+    }
+
+    SINK.replicateEntries(entries);
+    Thread.sleep(SLEEP_TIME);
+    Scan scan = new Scan();
+    ResultScanner scanRes = table1.getScanner(scan);
+    assertEquals(BATCH_SIZE/2,scanRes.next(BATCH_SIZE).length);
+  }
+
+  @Test
+  public void testMixedPutTables() throws Exception {
+    HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
+    for(int i = 0; i < BATCH_SIZE; i++) {
+      entries[i] =
+          createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
+              i, KeyValue.Type.Put);
+    }
+
+    SINK.replicateEntries(entries);
+    Thread.sleep(SLEEP_TIME);
+    Scan scan = new Scan();
+    ResultScanner scanRes = table2.getScanner(scan);
+    for(Result res : scanRes) {
+      assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
+    }
+  }
+
+  @Test
+  public void testMixedDeletes() throws Exception {
+    HLog.Entry[] entries = new HLog.Entry[3];
+    for(int i = 0; i < 3; i++) {
+      entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+    }
+    SINK.replicateEntries(entries);
+    Thread.sleep(SLEEP_TIME);
+    entries = new HLog.Entry[3];
+
+    entries[0] = createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn);
+    entries[1] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
+    entries[2] = createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn);
+
+    SINK.replicateEntries(entries);
+    Thread.sleep(SLEEP_TIME);
+    
+    Scan scan = new Scan();
+    ResultScanner scanRes = table1.getScanner(scan);
+    assertEquals(0, scanRes.next(3).length);
+  }
+
+  @Test
+  public void testRolling() throws Exception {
+    testMixedDeletes();
+    SINK.rollLog();
+    testMixedDeletes();
+    SINK.rollLog();
+    testMixedPutTables();
+  }
+
+  private HLog.Entry createEntry(byte [] table, int row,  KeyValue.Type type) {
+    byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
+    byte[] rowBytes = Bytes.toBytes(row);
+    final long now = System.currentTimeMillis();
+    KeyValue kv = null;
+    if(type.getCode() == KeyValue.Type.Put.getCode()) {
+      kv = new KeyValue(rowBytes, fam, fam, now,
+          KeyValue.Type.Put, Bytes.toBytes(row));
+    } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
+        kv = new KeyValue(rowBytes, fam, fam,
+            now, KeyValue.Type.DeleteColumn);
+    } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
+        kv = new KeyValue(rowBytes, fam, null,
+            now, KeyValue.Type.DeleteFamily);
+    }
+
+    HLogKey key = new HLogKey(table, table, now, now);
+
+    return new HLog.Entry(key, kv);
+  }
+}

Added: hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java?rev=907011&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java (added)
+++ hadoop/hbase/trunk/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java Fri Feb  5 17:37:54 2010
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertArrayEquals;
+
+import org.junit.*;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+
+public class TestReplication implements HConstants{
+
+  protected static final Log LOG = LogFactory.getLog(TestReplication.class);
+
+  private Configuration conf1;
+  private Configuration conf2;
+
+  private ZooKeeperWrapper zkw1;
+  private ZooKeeperWrapper zkw2;
+
+  private HBaseTestingUtility utility1;
+  private HBaseTestingUtility utility2;
+
+  private final int NB_ROWS_IN_BATCH = 100;
+  private final long SLEEP_TIME = 500;
+  private final int NB_RETRIES = 10;
+
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    try {
+    conf1 = HBaseConfiguration.create();
+    conf1.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
+        .getName());
+    conf1.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
+        .getName());
+    conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
+
+    utility1 = new HBaseTestingUtility(conf1);
+    utility1.startMiniZKCluster();
+    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+    zkw1 = new ZooKeeperWrapper(conf1, EmptyWatcher.instance);
+    zkw1.writeZNode("/1", "replication", "");
+    zkw1.writeZNode("/1/replication", "master",
+        conf1.get(ZOOKEEPER_QUORUM)+":" +
+        conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+    setIsReplication("true");
+
+
+    LOG.info("Setup first Zk");
+
+    conf2 = HBaseConfiguration.create();
+    conf2.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
+        .getName());
+    conf2.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
+        .getName());
+    conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
+
+    utility2 = new HBaseTestingUtility(conf2);
+    utility2.setZkCluster(miniZK);
+    zkw2 = new ZooKeeperWrapper(conf2, EmptyWatcher.instance);
+    zkw2.writeZNode("/2", "replication", "");
+    zkw2.writeZNode("/2/replication", "master",
+        conf1.get(ZOOKEEPER_QUORUM)+":" +
+        conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+
+    zkw1.writeZNode("/1/replication/peers", "test",
+        conf2.get(ZOOKEEPER_QUORUM)+":" +
+        conf2.get("hbase.zookeeper.property.clientPort")+":/2");
+
+    LOG.info("Setup second Zk");
+    } catch (Exception ex) { ex.printStackTrace(); throw ex; }
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {}
+
+  @Test
+  public void testReplication() throws Exception {
+    utility1.startMiniCluster();
+    utility2.startMiniCluster();
+
+    byte[] tableName = Bytes.toBytes("test");
+    byte[] famName = Bytes.toBytes("f");
+    byte[] row = Bytes.toBytes("row");
+
+    HTableDescriptor table = new HTableDescriptor(tableName);
+    HColumnDescriptor fam = new HColumnDescriptor(famName);
+    table.addFamily(fam);
+
+    HBaseAdmin admin1 = new HBaseAdmin(conf1);
+    HBaseAdmin admin2 = new HBaseAdmin(conf2);
+    admin1.createTable(table);
+    admin2.createTable(table);
+
+    Put put = new Put(row);
+    put.add(famName, row, row);
+
+    HTable table1 = new HTable(conf1, tableName);
+    table1.put(put);
+
+    HTable table2 = new HTable(conf2, tableName);
+    Get get = new Get(row);
+    for(int i = 0; i < NB_RETRIES; i++) {
+      if(i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = table2.get(get);
+      if(res.size() == 0) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.value(), row);
+        break;
+      }
+    }
+
+    Delete del = new Delete(row);
+    table1.delete(del);
+
+    table2 = new HTable(conf2, tableName);  
+    get = new Get(row);
+    for(int i = 0; i < NB_RETRIES; i++) {
+      if(i==NB_RETRIES-1) {
+        fail("Waited too much time for del replication");
+      }
+      Result res = table2.get(get);
+      if(res.size() >= 1) {
+        LOG.info("Row not deleted");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+
+    // normal Batch tests
+    table1.setAutoFlush(false);
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      put = new Put(Bytes.toBytes(i));
+      put.add(famName, row, row);
+      table1.put(put);
+    }
+    table1.flushCommits();
+
+    Scan scan = new Scan();
+
+    for(int i = 0; i < NB_RETRIES; i++) {
+      if(i==NB_RETRIES-1) {
+        fail("Waited too much time for normal batch replication");
+      }
+      ResultScanner scanner = table2.getScanner(scan);
+      Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+      scanner.close();
+      if(res.length != NB_ROWS_IN_BATCH) {
+        LOG.info("Only got " + res.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+
+    table1.setAutoFlush(true);
+
+    // Test stopping replication
+    setIsReplication("false");
+
+    // Takes some ms for ZK to fire the watcher
+    Thread.sleep(100);
+
+
+    put = new Put(Bytes.toBytes("stop start"));
+    put.add(famName, row, row);
+    table1.put(put);
+
+    get = new Get(Bytes.toBytes("stop start"));
+    for(int i = 0; i < NB_RETRIES; i++) {
+      if(i==NB_RETRIES-1) {
+        break;
+      }
+      Result res = table2.get(get);
+      if(res.size() >= 1) {
+        fail("Replication wasn't stopped");
+
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+
+    // Test restart replication
+
+    setIsReplication("true");
+
+    Thread.sleep(100);
+
+    table1.put(put);
+
+    for(int i = 0; i < NB_RETRIES; i++) {
+      if(i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = table2.get(get);
+      if(res.size() == 0) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.value(), row);
+        break;
+      }
+    }
+
+  }
+
+  private void setIsReplication(String bool) throws Exception{
+    zkw1.writeZNode("/1/replication", "state", bool);
+  }
+}



Mime
View raw message