hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1170950 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/client/replication/ src/main/java/org/apache/hadoop/hbase/replication/ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ src/main/ruby/ src/main/ruby/hbase/ ...
Date Thu, 15 Sep 2011 04:16:54 GMT
Author: tedyu
Date: Thu Sep 15 04:16:53 2011
New Revision: 1170950

URL: http://svn.apache.org/viewvc?rev=1170950&view=rev
Log:
HBASE-2196  Support more than one slave cluster (Lars Hofhansl)

Added:
    hbase/trunk/src/main/ruby/shell/commands/list_peers.rb
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/trunk/src/main/ruby/hbase/replication_admin.rb
    hbase/trunk/src/main/ruby/shell.rb
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1170950&r1=1170949&r2=1170950&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Sep 15 04:16:53 2011
@@ -507,6 +507,7 @@ Release 0.91.0 - Unreleased
                (Li Pi)
    HBASE-4296  Deprecate HTable[Interface].getRowOrBefore(...) (Lars Hofhansl)
    HBASE-2195  Support cyclic replication (Lars Hofhansl)
+   HBASE-2196  Support more than one slave cluster (Lars Hofhansl)
 
   NEW FEATURES
    HBASE-2001  Coprocessors: Colocate user code with regions (Mingjie Lai via

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1170950&r1=1170949&r2=1170950&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
Thu Sep 15 04:16:53 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client.r
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
@@ -134,6 +135,14 @@ public class ReplicationAdmin implements
   }
 
   /**
+   * Map of this cluster's peers for display.
+   * @return A map of peer ids to peer cluster keys
+   */
+  public Map<String, String> listPeers() {
+    return this.replicationZk.listPeers();
+  }
+
+  /**
    * Get the current status of the kill switch, if the cluster is replicating
    * or not.
    * @return true if the cluster is replicated, otherwise false

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1170950&r1=1170949&r2=1170950&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
Thu Sep 15 04:16:53 2011
@@ -187,6 +187,24 @@ public class ReplicationZookeeper {
   }
 
   /**
+   * Map of this cluster's peers for display.
+   * @return A map of peer ids to peer cluster keys
+   */
+  public Map<String,String> listPeers() {
+    Map<String,String> peers = new TreeMap<String,String>();
+    List<String> ids = null;
+    try {
+      ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+      for (String id : ids) {
+        peers.put(id, Bytes.toString(ZKUtil.getData(this.zookeeper,
+            ZKUtil.joinZNode(this.peersZNode, id))));
+      }
+    } catch (KeeperException e) {
+      this.abortable.abort("Cannot get the list of peers ", e);
+    }
+    return peers;
+  }
+  /**
    * Returns all region servers from given peer
    *
    * @param peerClusterId (byte) the cluster to interrogate
@@ -264,10 +282,6 @@ public class ReplicationZookeeper {
     }
     if (this.peerClusters.containsKey(peerId)) {
       return false;
-      // TODO remove when we support it
-    } else if (this.peerClusters.size() > 0) {
-      LOG.warn("Multiple slaves feature not supported");
-      return false;
     }
     ReplicationPeer peer = getPeer(peerId);
     if (peer == null) {
@@ -351,8 +365,6 @@ public class ReplicationZookeeper {
     try {
       if (peerExists(id)) {
         throw new IllegalArgumentException("Cannot add existing peer");
-      } else if (countPeers() > 0) {
-        throw new IllegalStateException("Multi-slave isn't supported yet");
       }
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
       ZKUtil.createAndWatch(this.zookeeper,
@@ -367,12 +379,6 @@ public class ReplicationZookeeper {
           ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
   }
 
-  private int countPeers() throws KeeperException {
-    List<String> peers =
-        ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
-    return peers == null ? 0 : peers.size();
-  }
-
   /**
    * This reads the state znode for replication and sets the atomic boolean
    */
@@ -408,11 +414,11 @@ public class ReplicationZookeeper {
   /**
    * Add a new log to the list of hlogs in zookeeper
    * @param filename name of the hlog's znode
-   * @param clusterId name of the cluster's znode
+   * @param peerId name of the cluster's znode
    */
-  public void addLogToList(String filename, String clusterId) {
+  public void addLogToList(String filename, String peerId) {
     try {
-      String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
+      String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId);
       znode = ZKUtil.joinZNode(znode, filename);
       ZKUtil.createWithParents(this.zookeeper, znode);
     } catch (KeeperException e) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1170950&r1=1170949&r2=1170950&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Thu Sep 15 04:16:53 2011
@@ -43,7 +43,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
@@ -96,7 +95,7 @@ public class ReplicationSource extends T
   // Should we stop everything?
   private Stoppable stopper;
   // List of chosen sinks (region servers)
-  private List<HServerAddress> currentPeers;
+  private List<ServerName> currentPeers;
   // How long should we sleep for each retry
   private long sleepForRetries;
   // Max size in bytes of entriesArray
@@ -173,7 +172,7 @@ public class ReplicationSource extends T
     this.conn = HConnectionManager.getConnection(conf);
     this.zkHelper = manager.getRepZkWrapper();
     this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
-    this.currentPeers = new ArrayList<HServerAddress>();
+    this.currentPeers = new ArrayList<ServerName>();
     this.random = new Random();
     this.replicating = replicating;
     this.manager = manager;
@@ -215,19 +214,18 @@ public class ReplicationSource extends T
     this.currentPeers.clear();
     List<ServerName> addresses =
         this.zkHelper.getSlavesAddresses(peerId);
-    Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
+    Set<ServerName> setOfAddr = new HashSet<ServerName>();
     int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
     LOG.info("Getting " + nbPeers +
         " rs from peer cluster # " + peerId);
     for (int i = 0; i < nbPeers; i++) {
-      HServerAddress address;
+      ServerName sn;
       // Make sure we get one address that we don't already have
       do {
-        ServerName sn = addresses.get(this.random.nextInt(addresses.size()));
-        address = new HServerAddress(sn.getHostname(), sn.getPort());
-      } while (setOfAddr.contains(address));
-      LOG.info("Choosing peer " + address);
-      setOfAddr.add(address);
+        sn = addresses.get(this.random.nextInt(addresses.size()));
+      } while (setOfAddr.contains(sn));
+      LOG.info("Choosing peer " + sn);
+      setOfAddr.add(sn);
     }
     this.currentPeers.addAll(setOfAddr);
   }
@@ -694,9 +692,9 @@ public class ReplicationSource extends T
     if (this.currentPeers.size() == 0) {
       throw new IOException(this.peerClusterZnode + " has 0 region servers");
     }
-    HServerAddress address =
+    ServerName address =
         currentPeers.get(random.nextInt(this.currentPeers.size()));
-    return this.conn.getHRegionConnection(address);
+    return this.conn.getHRegionConnection(address.getHostname(), address.getPort());
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1170950&r1=1170949&r2=1170950&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
Thu Sep 15 04:16:53 2011
@@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.replicat
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
@@ -70,7 +72,7 @@ public class ReplicationSourceManager {
   // All about stopping
   private final Stoppable stopper;
   // All logs we are currently trackign
-  private final SortedSet<String> hlogs;
+  private final Map<String, SortedSet<String>> hlogsById;
   private final Configuration conf;
   private final FileSystem fs;
   // The path to the latest log we saw, for new coming sources
@@ -108,7 +110,7 @@ public class ReplicationSourceManager {
     this.replicating = replicating;
     this.zkHelper = zkHelper;
     this.stopper = stopper;
-    this.hlogs = new TreeSet<String>();
+    this.hlogsById = new HashMap<String, SortedSet<String>>();
     this.oldsources = new ArrayList<ReplicationSourceInterface>();
     this.conf = conf;
     this.fs = fs;
@@ -149,14 +151,15 @@ public class ReplicationSourceManager {
   public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered)
{
     String key = log.getName();
     LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
-    this.zkHelper.writeReplicationStatus(key.toString(), id, position);
-    synchronized (this.hlogs) {
-      if (!queueRecovered && this.hlogs.first() != key) {
-        SortedSet<String> hlogSet = this.hlogs.headSet(key);
+    this.zkHelper.writeReplicationStatus(key, id, position);
+    synchronized (this.hlogsById) {
+      SortedSet<String> hlogs = this.hlogsById.get(id);
+      if (!queueRecovered && hlogs.first() != key) {
+        SortedSet<String> hlogSet = hlogs.headSet(key);
         LOG.info("Removing " + hlogSet.size() +
             " logs in the list: " + hlogSet);
         for (String hlog : hlogSet) {
-          this.zkHelper.removeLogFromList(hlog.toString(), id);
+          this.zkHelper.removeLogFromList(hlog, id);
         }
         hlogSet.clear();
       }
@@ -200,12 +203,14 @@ public class ReplicationSourceManager {
         getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
     // TODO set it to what's in ZK
     src.setSourceEnabled(true);
-    synchronized (this.hlogs) {
+    synchronized (this.hlogsById) {
       this.sources.add(src);
-      if (this.hlogs.size() > 0) {
-        // Add the latest hlog to that source's queue
-        this.zkHelper.addLogToList(this.hlogs.last(),
-            this.sources.get(0).getPeerClusterZnode());
+      this.hlogsById.put(id, new TreeSet<String>());
+      // Add the latest hlog to that source's queue
+      if (this.latestPath != null) {
+        String name = this.latestPath.getName();
+        this.hlogsById.get(id).add(name);
+        this.zkHelper.addLogToList(name, src.getPeerClusterZnode());
         src.enqueueLog(this.latestPath);
       }
     }
@@ -230,8 +235,8 @@ public class ReplicationSourceManager {
    * Get a copy of the hlogs of the first source on this rs
    * @return a sorted set of hlog names
    */
-  protected SortedSet<String> getHLogs() {
-    return new TreeSet<String>(this.hlogs);
+  protected Map<String, SortedSet<String>> getHLogs() {
+    return Collections.unmodifiableMap(hlogsById);
   }
 
   /**
@@ -248,21 +253,25 @@ public class ReplicationSourceManager {
       return;
     }
 
-    synchronized (this.hlogs) {
-      if (this.sources.size() > 0) {
-        this.zkHelper.addLogToList(newLog.getName(),
-            this.sources.get(0).getPeerClusterZnode());
-      } else {
-        // If there's no slaves, don't need to keep the old hlogs since
-        // we only consider the last one when a new slave comes in
-        this.hlogs.clear();
+    synchronized (this.hlogsById) {
+      String name = newLog.getName();
+      for (ReplicationSourceInterface source : this.sources) {
+        this.zkHelper.addLogToList(name, source.getPeerClusterZnode());
+      }
+      for (SortedSet<String> hlogs : this.hlogsById.values()) {
+        if (this.sources.isEmpty()) {
+          // If there's no slaves, don't need to keep the old hlogs since
+          // we only consider the last one when a new slave comes in
+          hlogs.clear();
+        }
+        hlogs.add(name);
       }
-      this.hlogs.add(newLog.getName());
     }
+
     this.latestPath = newLog;
-    // This only update the sources we own, not the recovered ones
+    // This only updates the sources we own, not the recovered ones
     for (ReplicationSourceInterface source : this.sources) {
-      source.enqueueLog(newLog);
+      source.enqueueLog(newLog);    
     }
   }
 
@@ -281,7 +290,7 @@ public class ReplicationSourceManager {
    * @param manager the manager to use
    * @param stopper the stopper object for this region server
    * @param replicating the status of the replication on this cluster
-   * @param peerClusterId the id of the peer cluster
+   * @param peerId the id of the peer cluster
    * @return the created source
    * @throws IOException
    */
@@ -291,7 +300,7 @@ public class ReplicationSourceManager {
       final ReplicationSourceManager manager,
       final Stoppable stopper,
       final AtomicBoolean replicating,
-      final String peerClusterId) throws IOException {
+      final String peerId) throws IOException {
     ReplicationSourceInterface src;
     try {
       @SuppressWarnings("rawtypes")
@@ -299,12 +308,12 @@ public class ReplicationSourceManager {
           ReplicationSource.class.getCanonicalName()));
       src = (ReplicationSourceInterface) c.newInstance();
     } catch (Exception e) {
-      LOG.warn("Passed replication source implemention throws errors, " +
+      LOG.warn("Passed replication source implementation throws errors, " +
           "defaulting to ReplicationSource", e);
       src = new ReplicationSource();
 
     }
-    src.init(conf, fs, manager, stopper, replicating, peerClusterId);
+    src.init(conf, fs, manager, stopper, replicating, peerId);
     return src;
   }
 
@@ -410,7 +419,7 @@ public class ReplicationSourceManager {
         return;
       }
       LOG.info(path + " znode expired, trying to lock it");
-      transferQueues(zkHelper.getZNodeName(path));
+      transferQueues(ReplicationZookeeper.getZNodeName(path));
     }
 
     /**
@@ -462,7 +471,7 @@ public class ReplicationSourceManager {
       if (peers == null) {
         return;
       }
-      String id = zkHelper.getZNodeName(path);
+      String id = ReplicationZookeeper.getZNodeName(path);
       removePeer(id);
     }
 

Modified: hbase/trunk/src/main/ruby/hbase/replication_admin.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/replication_admin.rb?rev=1170950&r1=1170949&r2=1170950&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/replication_admin.rb (original)
+++ hbase/trunk/src/main/ruby/hbase/replication_admin.rb Thu Sep 15 04:16:53 2011
@@ -44,6 +44,12 @@ module Hbase
     end
 
     #----------------------------------------------------------------------------------------------
+    # List all peer clusters
+    def list_peers
+      @replication_admin.listPeers
+    end
+
+    #----------------------------------------------------------------------------------------------
     # Restart the replication stream to the specified peer
     def enable_peer(id)
       @replication_admin.enablePeer(id)

Modified: hbase/trunk/src/main/ruby/shell.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell.rb?rev=1170950&r1=1170949&r2=1170950&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/shell.rb (original)
+++ hbase/trunk/src/main/ruby/shell.rb Thu Sep 15 04:16:53 2011
@@ -276,6 +276,7 @@ Shell.load_command_group(
   :commands => %w[
     add_peer
     remove_peer
+    list_peers
     enable_peer
     disable_peer
     start_replication

Added: hbase/trunk/src/main/ruby/shell/commands/list_peers.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/list_peers.rb?rev=1170950&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/list_peers.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/list_peers.rb Thu Sep 15 04:16:53 2011
@@ -0,0 +1,46 @@
+#
+# Copyright The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class ListPeers< Command
+      def help
+        return <<-EOF
+List all replication peer clusters.
+
+  hbase> list_peers
+EOF
+      end
+
+      def command()
+        now = Time.now
+        peers = replication_admin.list_peers
+
+        formatter.header(["PEER ID", "CLUSTER KEY"])
+
+        peers.entrySet().each do |e|
+          formatter.row([ e.key, e.value ])
+        end
+
+        formatter.footer(now)
+      end
+    end
+  end
+end

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java?rev=1170950&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
(added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
Thu Sep 15 04:16:53 2011
@@ -0,0 +1,263 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMultiSlaveReplication {
+
+  private static final Log LOG = LogFactory.getLog(TestReplication.class);
+
+  private static Configuration conf1;
+  private static Configuration conf2;
+  private static Configuration conf3;
+
+  private static String clusterKey2;
+  private static String clusterKey3;
+
+  private static HBaseTestingUtility utility1;
+  private static HBaseTestingUtility utility2;
+  private static HBaseTestingUtility utility3;
+  private static final long SLEEP_TIME = 500;
+  private static final int NB_RETRIES = 10;
+
+  private static final byte[] tableName = Bytes.toBytes("test");
+  private static final byte[] famName = Bytes.toBytes("f");
+  private static final byte[] row = Bytes.toBytes("row");
+  private static final byte[] row1 = Bytes.toBytes("row1");
+  private static final byte[] row2 = Bytes.toBytes("row2");
+  private static final byte[] row3 = Bytes.toBytes("row3");
+  private static final byte[] noRepfamName = Bytes.toBytes("norep");
+
+  private static HTableDescriptor table;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf1 = HBaseConfiguration.create();
+    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+    // smaller block size and capacity to trigger more operations
+    // and test them
+    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
+    conf1.setInt("replication.source.size.capacity", 1024);
+    conf1.setLong("replication.source.sleepforretries", 100);
+    conf1.setInt("hbase.regionserver.maxlogs", 10);
+    conf1.setLong("hbase.master.logcleaner.ttl", 10);
+    conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+    conf1.setBoolean("dfs.support.append", true);
+    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+        "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
+
+    utility1 = new HBaseTestingUtility(conf1);
+    utility1.startMiniZKCluster();
+    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+    new ZooKeeperWatcher(conf1, "cluster1", null, true);
+
+    conf2 = new Configuration(conf1);
+    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+
+    conf3 = new Configuration(conf1);
+    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
+
+    utility2 = new HBaseTestingUtility(conf2);
+    utility2.setZkCluster(miniZK);
+    new ZooKeeperWatcher(conf2, "cluster3", null, true);
+
+    utility3 = new HBaseTestingUtility(conf3);
+    utility3.setZkCluster(miniZK);
+    new ZooKeeperWatcher(conf3, "cluster3", null, true);
+
+    clusterKey2 = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+    conf2.get("hbase.zookeeper.property.clientPort")+":/2";
+
+    clusterKey3 = conf3.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+    conf3.get("hbase.zookeeper.property.clientPort")+":/3";
+    
+    table = new HTableDescriptor(tableName);
+    HColumnDescriptor fam = new HColumnDescriptor(famName);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    table.addFamily(fam);
+    fam = new HColumnDescriptor(noRepfamName);
+    table.addFamily(fam);
+  }
+
+  @Test(timeout=300000)
+  public void testMultiSlaveReplication() throws Exception {
+    LOG.info("testCyclicReplication");
+    MiniHBaseCluster master = utility1.startMiniCluster();
+    utility2.startMiniCluster();
+    utility3.startMiniCluster();
+    ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
+
+    new HBaseAdmin(conf1).createTable(table);
+    new HBaseAdmin(conf2).createTable(table);
+    new HBaseAdmin(conf3).createTable(table);
+    HTable htable1 = new HTable(conf1, tableName);
+    htable1.setWriteBufferSize(1024);
+    HTable htable2 = new HTable(conf2, tableName);
+    htable2.setWriteBufferSize(1024);
+    HTable htable3 = new HTable(conf3, tableName);
+    htable3.setWriteBufferSize(1024);
+    
+    admin1.addPeer("1", clusterKey2);
+
+    // put "row" and wait 'til it got around, then delete
+    putAndWait(row, famName, htable1, htable2);
+    deleteAndWait(row, htable1, htable2);
+    // check it wasn't replication to cluster 3
+    checkRow(row,0,htable3);
+
+    putAndWait(row2, famName, htable1, htable2);
+
+    // now roll the region server's logs
+    new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0).getServerName().toString());
+    // after the log was rolled put a new row
+    putAndWait(row3, famName, htable1, htable2);
+
+    admin1.addPeer("2", clusterKey3);
+
+    // put a row, check it was replicated to all clusters
+    putAndWait(row1, famName, htable1, htable2, htable3);
+    // delete and verify
+    deleteAndWait(row1, htable1, htable2, htable3);
+
+    // make sure row2 did not get replicated after
+    // cluster 3 was added
+    checkRow(row2,0,htable3);
+
+    // row3 will get replicated, because it was in the
+    // latest log
+    checkRow(row3,1,htable3);
+
+    Put p = new Put(row);
+    p.add(famName, row, row);
+    htable1.put(p);
+    // now roll the logs again
+    new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0)
+        .getServerName().toString());
+
+    // cleanup "row2", also conveniently use this to wait replication
+    // to finish
+    deleteAndWait(row2, htable1, htable2, htable3);
+    // Even if the log was rolled in the middle of the replication
+    // "row" is still replication.
+    checkRow(row, 1, htable2, htable3);
+
+    // cleanup the rest
+    deleteAndWait(row, htable1, htable2, htable3);
+    deleteAndWait(row3, htable1, htable2, htable3);
+
+    utility3.shutdownMiniCluster();
+    utility2.shutdownMiniCluster();
+    utility1.shutdownMiniCluster();
+  }
+
+  private void checkRow(byte[] row, int count, HTable... tables) throws IOException {
+    Get get = new Get(row);
+    for (HTable table : tables) {
+      Result res = table.get(get);
+      assertEquals(count, res.size());
+    }
+  }
+
+  private void deleteAndWait(byte[] row, HTable source, HTable... targets)
+  throws Exception {
+    Delete del = new Delete(row);
+    source.delete(del);
+
+    Get get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for del replication");
+      }
+      boolean removedFromAll = true;
+      for (HTable target : targets) {
+        Result res = target.get(get);
+        if (res.size() >= 1) {
+          LOG.info("Row not deleted");
+          removedFromAll = false;
+          break;
+        }
+      }
+      if (removedFromAll) {
+        break;
+      } else {
+        Thread.sleep(SLEEP_TIME);        
+      }
+    }
+  }
+
+  private void putAndWait(byte[] row, byte[] fam, HTable source, HTable... targets)
+  throws Exception {
+    Put put = new Put(row);
+    put.add(fam, row, row);
+    source.put(put);
+
+    Get get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      boolean replicatedToAll = true;
+      for (HTable target : targets) {
+        Result res = target.get(get);
+        if (res.size() == 0) {
+          LOG.info("Row not available");
+          replicatedToAll = false;
+          break;
+        } else {
+          assertArrayEquals(res.value(), row);
+        }
+      }
+      if (replicatedToAll) {
+        break;
+      } else {
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1170950&r1=1170949&r2=1170950&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Thu Sep 15 04:16:53 2011
@@ -81,6 +81,8 @@ public class TestReplicationSourceManage
 
   private static final byte[] test = Bytes.toBytes("test");
 
+  private static final String slaveId = "1";
+
   private static FileSystem fs;
 
   private static Path oldLogDir;
@@ -115,7 +117,7 @@ public class TestReplicationSourceManage
     logDir = new Path(utility.getTestDir(),
         HConstants.HREGION_LOGDIR_NAME);
 
-    manager.addSource("1");
+    manager.addSource(slaveId);
 
     htd = new HTableDescriptor(test);
     HColumnDescriptor col = new HColumnDescriptor("f1");
@@ -188,7 +190,7 @@ public class TestReplicationSourceManage
       hlog.append(hri, key, edit, htd);
     }
 
-    assertEquals(6, manager.getHLogs().size());
+    assertEquals(6, manager.getHLogs().get(slaveId).size());
 
     hlog.rollWriter();
 



Mime
View raw message