hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r991397 [10/15] - in /hbase/trunk: ./ bin/ conf/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/avro/ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/o...
Date Tue, 31 Aug 2010 23:51:50 GMT
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Tue Aug 31 23:51:44 2010
@@ -39,7 +39,8 @@ import org.apache.hadoop.io.WritableComp
  * associated row.
  */
 public class HLogKey implements WritableComparable<HLogKey> {
-  private byte [] regionName;
+  //  The encoded region name.
+  private byte [] encodedRegionName;
   private byte [] tablename;
   private long logSeqNum;
   // Time at which this edit was written.
@@ -57,27 +58,24 @@ public class HLogKey implements Writable
    * We maintain the tablename mainly for debugging purposes.
    * A regionName is always a sub-table object.
    *
-   * @param regionName  - name of region
+   * @param encodedRegionName Encoded name of the region as returned by
+   * {@link HRegionInfo#getEncodedNameAsBytes()}.
    * @param tablename   - name of table
    * @param logSeqNum   - log sequence number
    * @param now Time at which this edit was written.
    */
-  public HLogKey(final byte [] regionName, final byte [] tablename,
+  public HLogKey(final byte [] encodedRegionName, final byte [] tablename,
       long logSeqNum, final long now) {
-    this.regionName = regionName;
+    this.encodedRegionName = encodedRegionName;
     this.tablename = tablename;
     this.logSeqNum = logSeqNum;
     this.writeTime = now;
     this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
   }
 
-  //////////////////////////////////////////////////////////////////////////////
-  // A bunch of accessors
-  //////////////////////////////////////////////////////////////////////////////
-
-  /** @return region name */
-  public byte [] getRegionName() {
-    return regionName;
+  /** @return encoded region name */
+  public byte [] getEncodedRegionName() {
+    return encodedRegionName;
   }
 
   /** @return table name */
@@ -119,7 +117,7 @@ public class HLogKey implements Writable
 
   @Override
   public String toString() {
-    return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" +
+    return Bytes.toString(tablename) + "/" + Bytes.toString(encodedRegionName) + "/" +
       logSeqNum;
   }
 
@@ -136,7 +134,7 @@ public class HLogKey implements Writable
 
   @Override
   public int hashCode() {
-    int result = Bytes.hashCode(this.regionName);
+    int result = Bytes.hashCode(this.encodedRegionName);
     result ^= this.logSeqNum;
     result ^= this.writeTime;
     result ^= this.clusterId;
@@ -144,7 +142,7 @@ public class HLogKey implements Writable
   }
 
   public int compareTo(HLogKey o) {
-    int result = Bytes.compareTo(this.regionName, o.regionName);
+    int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
     if (result == 0) {
       if (this.logSeqNum < o.logSeqNum) {
         result = -1;
@@ -163,7 +161,7 @@ public class HLogKey implements Writable
   }
 
   public void write(DataOutput out) throws IOException {
-    Bytes.writeByteArray(out, this.regionName);
+    Bytes.writeByteArray(out, this.encodedRegionName);
     Bytes.writeByteArray(out, this.tablename);
     out.writeLong(this.logSeqNum);
     out.writeLong(this.writeTime);
@@ -171,7 +169,7 @@ public class HLogKey implements Writable
   }
 
   public void readFields(DataInput in) throws IOException {
-    this.regionName = Bytes.readByteArray(in);
+    this.encodedRegionName = Bytes.readByteArray(in);
     this.tablename = Bytes.readByteArray(in);
     this.logSeqNum = in.readLong();
     this.writeTime = in.readLong();
@@ -181,5 +179,4 @@ public class HLogKey implements Writable
       // Means it's an old key, just continue
     }
   }
-
-}
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+
+/**
+ * Get notification of {@link HLog}/WAL log events. The invocations are inline
+ * so make sure your implementation is fast else you'll slow hbase.
+ */
+public interface WALObserver {
+  /**
+   * The WAL was rolled.
+   * @param newFile the path to the new hlog
+   */
+  public void logRolled(Path newFile);
+
+  /**
+   * A request was made that the WAL be rolled.
+   */
+  public void logRollRequested();
+
+  /**
+  * Called before each write.
+  * @param info
+  * @param logKey
+  * @param logEdit
+  */
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+   WALEdit logEdit);
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,554 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+
+/**
+ * This class serves as a helper for all things related to zookeeper
+ * in replication.
+ * <p/>
+ * The layout looks something like this under zookeeper.znode.parent
+ * for the master cluster:
+ * <p/>
+ * <pre>
+ * replication/
+ *  master     {contains a full cluster address}
+ *  state      {contains true or false}
+ *  clusterId  {contains a byte}
+ *  peers/
+ *    1/   {contains a full cluster address}
+ *    2/
+ *    ...
+ *  rs/ {lists all RS that replicate}
+ *    startcode1/ {lists all peer clusters}
+ *      1/ {lists hlogs to process}
+ *        10.10.1.76%3A53488.123456789 {contains nothing or a position}
+ *        10.10.1.76%3A53488.123456790
+ *        ...
+ *      2/
+ *      ...
+ *    startcode2/
+ *    ...
+ * </pre>
+ */
+public class ReplicationZookeeper {
+  private static final Log LOG =
+    LogFactory.getLog(ReplicationZookeeper.class);
+  // Name of znode we use to lock when failover
+  private final static String RS_LOCK_ZNODE = "lock";
+  // Our handle on zookeeper
+  private final ZooKeeperWatcher zookeeper;
+  // Map of addresses of peer clusters with their ZKW
+  private final Map<String, ReplicationZookeeper> peerClusters;
+  // Path to the root replication znode
+  private final String replicationZNode;
+  // Path to the peer clusters znode
+  private final String peersZNode;
+  // Path to the znode that contains all RS that replicates
+  private final String rsZNode;
+  // Path to this region server's name under rsZNode
+  private final String rsServerNameZnode;
+  // Name node if the replicationState znode
+  private final String replicationStateNodeName;
+  // If this RS is part of a master cluster
+  private final boolean replicationMaster;
+  private final Configuration conf;
+  // Is this cluster replicating at the moment?
+  private final AtomicBoolean replicating;
+  // Byte (stored as string here) that identifies this cluster
+  private final String clusterId;
+  // Abortable
+  private final Abortable abortable;
+
+  /**
+   * Constructor used by region servers, connects to the peer cluster right away.
+   *
+   * @param zookeeper
+   * @param replicating    atomic boolean to start/stop replication
+   * @throws IOException
+   * @throws KeeperException 
+   */
+  public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
+  throws IOException, KeeperException {
+    this.abortable = server;
+    this.zookeeper = server.getZooKeeper();
+    this.conf = server.getConfiguration();
+    String replicationZNodeName =
+        conf.get("zookeeper.znode.replication", "replication");
+    String peersZNodeName =
+        conf.get("zookeeper.znode.replication.peers", "peers");
+    String repMasterZNodeName =
+        conf.get("zookeeper.znode.replication.master", "master");
+    this.replicationStateNodeName =
+        conf.get("zookeeper.znode.replication.state", "state");
+    String clusterIdZNodeName =
+        conf.get("zookeeper.znode.replication.clusterId", "clusterId");
+    String rsZNodeName =
+        conf.get("zookeeper.znode.replication.rs", "rs");
+    String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+          this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
+          this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+
+    this.peerClusters = new HashMap<String, ReplicationZookeeper>();
+    this.replicationZNode =
+      ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
+    this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
+    this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
+
+    this.replicating = replicating;
+    setReplicating();
+    String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
+    byte [] data = ZKUtil.getData(this.zookeeper, znode);
+    String idResult = Bytes.toString(data);
+    this.clusterId = idResult == null?
+      Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
+
+    znode = ZKUtil.joinZNode(this.replicationZNode, repMasterZNodeName);
+    data = ZKUtil.getData(this.zookeeper, znode);
+    String address = Bytes.toString(data);
+    this.replicationMaster = thisCluster.equals(address);
+    LOG.info("This cluster (" + thisCluster + ") is a " +
+      (this.replicationMaster ? "master" : "slave") + " for replication" +
+        ", compared with (" + address + ")");
+
+    if (server.getServerName() != null) {
+      this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
+      // Set a tracker on replicationStateNodeNode
+      ReplicationStatusTracker tracker =
+        new ReplicationStatusTracker(this.zookeeper, getRepStateNode(), server);
+      tracker.start();
+
+      List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+      if (znodes != null) {
+        for (String z : znodes) {
+          connectToPeer(z);
+        }
+      }
+    } else {
+      this.rsServerNameZnode = null;
+    }
+
+  }
+
+  /**
+   * Returns all region servers from given peer
+   *
+   * @param peerClusterId (byte) the cluster to interrogate
+   * @return addresses of all region servers
+   */
+  public List<HServerAddress> getPeersAddresses(String peerClusterId) {
+    if (this.peerClusters.size() == 0) {
+      return new ArrayList<HServerAddress>(0);
+    }
+    ReplicationZookeeper zkw = this.peerClusters.get(peerClusterId);
+    return zkw == null?
+      new ArrayList<HServerAddress>(0):
+      zkw.scanAddressDirectory(this.zookeeper.rsZNode);
+  }
+
+  /**
+   * Scan a directory of address data.
+   * @param znode The parent node
+   * @return The directory contents as HServerAddresses
+   */
+  public List<HServerAddress> scanAddressDirectory(String znode) {
+    List<HServerAddress> list = new ArrayList<HServerAddress>();
+    List<String> nodes = null;
+    try {
+      nodes = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Scanning " + znode, e);
+    }
+    if (nodes == null) {
+      return list;
+    }
+    for (String node : nodes) {
+      String path = ZKUtil.joinZNode(znode, node);
+      list.add(readAddress(path));
+    }
+    return list;
+  }
+
+  private HServerAddress readAddress(String znode) {
+    byte [] data = null;
+    try {
+      data = ZKUtil.getData(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Getting address", e);
+    }
+    return new HServerAddress(Bytes.toString(data));
+  }
+
+  /**
+   * This method connects this cluster to another one and registers it
+   * in this region server's replication znode
+   * @param peerId id of the peer cluster
+   * @throws KeeperException 
+   */
+  private void connectToPeer(String peerId) throws IOException, KeeperException {
+    String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
+    byte [] data = ZKUtil.getData(this.zookeeper, znode);
+    String [] ensemble = Bytes.toString(data).split(":");
+    if (ensemble.length != 3) {
+      throw new IllegalArgumentException("Wrong format of cluster address: " +
+        Bytes.toStringBinary(data));
+    }
+    Configuration otherConf = new Configuration(this.conf);
+    otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
+    otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
+    otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
+    // REENABLE -- FIX!!!!
+    /*
+    ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
+        "connection to cluster: " + peerId);
+    zkw.registerListener(new ReplicationStatusWatcher());
+    this.peerClusters.put(peerId, zkw);
+    this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
+        this.rsServerNameZnode, peerId));
+        */
+    LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+  }
+
+  /**
+   * This reads the state znode for replication and sets the atomic boolean
+   */
+  private void setReplicating() {
+    try {
+      byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
+      String value = Bytes.toString(data);
+      if (value == null) LOG.info(getRepStateNode() + " data is null");
+      else {
+        this.replicating.set(Boolean.parseBoolean(value));
+        LOG.info("Replication is now " + (this.replicating.get()?
+          "started" : "stopped"));
+      }
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
+    }
+  }
+
+  private String getRepStateNode() {
+    return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
+  }
+
+  /**
+   * Add a new log to the list of hlogs in zookeeper
+   * @param filename name of the hlog's znode
+   * @param clusterId name of the cluster's znode
+   */
+  public void addLogToList(String filename, String clusterId) {
+    try {
+      String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      ZKUtil.createAndWatch(this.zookeeper, znode, Bytes.toBytes(""));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed add log to list", e);
+    }
+  }
+
+  /**
+   * Remove a log from the list of hlogs in zookeeper
+   * @param filename name of the hlog's znode
+   * @param clusterId name of the cluster's znode
+   */
+  public void removeLogFromList(String filename, String clusterId) {
+    try {
+      String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      ZKUtil.deleteChildrenRecursively(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed remove from list", e);
+    }
+  }
+
+  /**
+   * Set the current position of the specified cluster in the current hlog
+   * @param filename filename name of the hlog's znode
+   * @param clusterId clusterId name of the cluster's znode
+   * @param position the position in the file
+   * @throws IOException
+   */
+  public void writeReplicationStatus(String filename, String clusterId,
+      long position) {
+    try {
+      String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      // Why serialize String of Long and note Long as bytes?
+      ZKUtil.createAndWatch(this.zookeeper, znode,
+        Bytes.toBytes(Long.toString(position)));
+    } catch (KeeperException e) {
+      this.abortable.abort("Writing replication status", e);
+    }
+  }
+
+  /**
+   * Get a list of all the other region servers in this cluster
+   * and set a watch
+   * @param watch the watch to set
+   * @return a list of server nanes
+   */
+  public List<String> getRegisteredRegionServers(Watcher watch) {
+    List<String> result = null;
+    try {
+      // TODO: This is rsZNode from zk which is like getListOfReplicators
+      // but maybe these are from different zk instances?
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of registered region servers", e);
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of the replicators that have queues, they can be alive, dead
+   * or simply from a previous run
+   * @param watch the watche to set
+   * @return a list of server names
+   */
+  public List<String> getListOfReplicators() {
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of replicators", e);
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of peer clusters for the specified server names
+   * @param rs server names of the rs
+   * @param watch the watch to set
+   * @return a list of peer cluster
+   */
+  public List<String> getListPeersForRS(String rs) {
+    String znode = ZKUtil.joinZNode(rsZNode, rs);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of peers for rs", e);
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of hlogs for the specified region server and peer cluster
+   * @param rs server names of the rs
+   * @param id peer cluster
+   * @param watch the watch to set
+   * @return a list of hlogs
+   */
+  public List<String> getListHLogsForPeerForRS(String rs, String id) {
+    String znode = ZKUtil.joinZNode(rsZNode, rs);
+    znode = ZKUtil.joinZNode(znode, id);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of hlogs for peer", e);
+    }
+    return result;
+  }
+
+  /**
+   * Try to set a lock in another server's znode.
+   * @param znode the server names of the other server
+   * @return true if the lock was acquired, false in every other cases
+   */
+  public boolean lockOtherRS(String znode) {
+    try {
+      String parent = ZKUtil.joinZNode(this.rsZNode, znode);
+      String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
+      ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed lock other rs", e);
+    }
+    return true;
+  }
+
+  /**
+   * This methods copies all the hlogs queues from another region server
+   * and returns them all sorted per peer cluster (appended with the dead
+   * server's znode)
+   * @param znode server names to copy
+   * @return all hlogs for all peers of that cluster, null if an error occurred
+   */
+  public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+    // TODO this method isn't atomic enough, we could start copying and then
+    // TODO fail for some reason and we would end up with znodes we don't want.
+    SortedMap<String,SortedSet<String>> queues =
+        new TreeMap<String,SortedSet<String>>();
+    try {
+      String nodePath = ZKUtil.joinZNode(rsZNode, znode);
+      List<String> clusters =
+        ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
+      // We have a lock znode in there, it will count as one.
+      if (clusters == null || clusters.size() <= 1) {
+        return queues;
+      }
+      // The lock isn't a peer cluster, remove it
+      clusters.remove(RS_LOCK_ZNODE);
+      for (String cluster : clusters) {
+        // We add the name of the recovered RS to the new znode, we can even
+        // do that for queues that were recovered 10 times giving a znode like
+        // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+        String newCluster = cluster+"-"+znode;
+        String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster);
+        ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
+          HConstants.EMPTY_BYTE_ARRAY);
+        String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
+        List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
+        // That region server didn't have anything to replicate for this cluster
+        if (hlogs == null || hlogs.size() == 0) {
+          continue;
+        }
+        SortedSet<String> logQueue = new TreeSet<String>();
+        queues.put(newCluster, logQueue);
+        for (String hlog : hlogs) {
+          String z = ZKUtil.joinZNode(clusterPath, hlog);
+          byte [] position = ZKUtil.getData(this.zookeeper, z);
+          LOG.debug("Creating " + hlog + " with data " + Bytes.toString(position));
+          String child = ZKUtil.joinZNode(newClusterZnode, hlog);
+          ZKUtil.createAndWatch(this.zookeeper, child, position);
+          logQueue.add(hlog);
+        }
+      }
+    } catch (KeeperException e) {
+      this.abortable.abort("Copy queues from rs", e);
+    }
+    return queues;
+  }
+
+  /**
+   * Delete a complete queue of hlogs
+   * @param peerZnode znode of the peer cluster queue of hlogs to delete
+   */
+  public void deleteSource(String peerZnode) {
+    try {
+      ZKUtil.deleteChildrenRecursively(this.zookeeper,
+          ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed delete of " + peerZnode, e);
+    }
+  }
+
+  /**
+   * Recursive deletion of all znodes in specified rs' znode
+   * @param znode
+   */
+  public void deleteRsQueues(String znode) {
+    try {
+      ZKUtil.deleteChildrenRecursively(this.zookeeper,
+          ZKUtil.joinZNode(rsZNode, znode));
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed delete of " + znode, e);
+    }
+  }
+
+  /**
+   * Delete this cluster's queues
+   */
+  public void deleteOwnRSZNode() {
+    deleteRsQueues(this.rsServerNameZnode);
+  }
+
+  /**
+   * Get the position of the specified hlog in the specified peer znode
+   * @param peerId znode of the peer cluster
+   * @param hlog name of the hlog
+   * @return the position in that hlog
+   * @throws KeeperException 
+   */
+  public long getHLogRepPosition(String peerId, String hlog)
+  throws KeeperException {
+    String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
+    String znode = ZKUtil.joinZNode(clusterZnode, hlog);
+    String data = Bytes.toString(ZKUtil.getData(this.zookeeper, znode));
+    return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
+  }
+
+  /**
+   * Tells if this cluster replicates or not
+   *
+   * @return if this is a master
+   */
+  public boolean isReplicationMaster() {
+    return this.replicationMaster;
+  }
+
+  /**
+   * Get the identification of the cluster
+   *
+   * @return the id for the cluster
+   */
+  public String getClusterId() {
+    return this.clusterId;
+  }
+
+  /**
+   * Get a map of all peer clusters
+   * @return map of peer cluster, zk address to ZKW
+   */
+  public Map<String, ReplicationZookeeper> getPeerClusters() {
+    return this.peerClusters;
+  }
+
+  /**
+   * Tracker for status of the replication
+   */
+  public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
+    public ReplicationStatusTracker(ZooKeeperWatcher watcher, String node,
+        Abortable abortable) {
+      super(watcher, node, abortable);
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      super.nodeDataChanged(path);
+      setReplicating();
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Tue Aug 31 23:51:44 2010
@@ -25,8 +25,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.LogCleanerDelegate;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+// REENALBE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 
@@ -45,7 +45,7 @@ public class ReplicationLogCleaner imple
   private static final Log LOG =
     LogFactory.getLog(ReplicationLogCleaner.class);
   private Configuration conf;
-  private ReplicationZookeeperWrapper zkHelper;
+  private ReplicationZookeeper zkHelper;
   private Set<String> hlogs = new HashSet<String>();
 
   /**
@@ -78,30 +78,31 @@ public class ReplicationLogCleaner imple
   private boolean refreshHLogsAndSearch(String searchedLog) {
     this.hlogs.clear();
     final boolean lookForLog = searchedLog != null;
-    List<String> rss = zkHelper.getListOfReplicators(this);
-    if (rss == null) {
-      LOG.debug("Didn't find any region server that replicates, deleting: " +
-          searchedLog);
-      return false;
-    }
-    for (String rs: rss) {
-      List<String> listOfPeers = zkHelper.getListPeersForRS(rs, this);
-      // if rs just died, this will be null
-      if (listOfPeers == null) {
-        continue;
-      }
-      for (String id : listOfPeers) {
-        List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, this);
-        if (peersHlogs != null) {
-          this.hlogs.addAll(peersHlogs);
-        }
-        // early exit if we found the log
-        if(lookForLog && this.hlogs.contains(searchedLog)) {
-          LOG.debug("Found log in ZK, keeping: " + searchedLog);
-          return true;
-        }
-      }
-    }
+// REENALBE
+//    List<String> rss = zkHelper.getListOfReplicators(this);
+//    if (rss == null) {
+//      LOG.debug("Didn't find any region server that replicates, deleting: " +
+//          searchedLog);
+//      return false;
+//    }
+//    for (String rs: rss) {
+//      List<String> listOfPeers = zkHelper.getListPeersForRS(rs, this);
+//      // if rs just died, this will be null
+//      if (listOfPeers == null) {
+//        continue;
+//      }
+//      for (String id : listOfPeers) {
+//        List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, this);
+//        if (peersHlogs != null) {
+//          this.hlogs.addAll(peersHlogs);
+//        }
+//        // early exit if we found the log
+//        if(lookForLog && this.hlogs.contains(searchedLog)) {
+//          LOG.debug("Found log in ZK, keeping: " + searchedLog);
+//          return true;
+//        }
+//      }
+//    }
     LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
     return false;
   }
@@ -109,14 +110,15 @@ public class ReplicationLogCleaner imple
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
-    try {
-      this.zkHelper = new ReplicationZookeeperWrapper(
-          ZooKeeperWrapper.createInstance(this.conf,
-              HMaster.class.getName()),
-          this.conf, new AtomicBoolean(true), null);
-    } catch (IOException e) {
-      LOG.error(e);
-    }
+//    try {
+      // REENABLE
+//      this.zkHelper = new ReplicationZookeeperWrapper(
+//          ZooKeeperWrapper.createInstance(this.conf,
+//              HMaster.class.getName()),
+//          this.conf, new AtomicBoolean(true), null);
+//    } catch (IOException e) {
+//      LOG.error(e);
+//    }
     refreshHLogsAndSearch(null);
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Tue Aug 31 23:51:44 2010
@@ -19,72 +19,76 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-
-import java.io.IOException;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.zookeeper.KeeperException;
 
 /**
- * Replication serves as an umbrella over the setup of replication and
- * is used by HRS.
+ * Gateway to Replication.  Used by {@link HRegionServer}.
  */
-public class Replication implements LogEntryVisitor {
-
+public class Replication implements WALObserver {
   private final boolean replication;
   private final ReplicationSourceManager replicationManager;
   private boolean replicationMaster;
   private final AtomicBoolean replicating = new AtomicBoolean(true);
-  private final ReplicationZookeeperWrapper zkHelper;
+  private final ReplicationZookeeper zkHelper;
   private final Configuration conf;
-  private final AtomicBoolean  stopRequested;
   private ReplicationSink replicationSink;
+  // Hosting server
+  private final Server server;
 
   /**
    * Instantiate the replication management (if rep is enabled).
-   * @param conf conf to use
-   * @param hsi the info if this region server
+   * @param server Hosting server
    * @param fs handle to the filesystem
+   * @param logDir
    * @param oldLogDir directory where logs are archived
-   * @param stopRequested boolean that tells us if we are shutting down
    * @throws IOException
+   * @throws KeeperException 
    */
-  public Replication(Configuration conf, HServerInfo hsi,
-                     FileSystem fs, Path logDir, Path oldLogDir,
-                     AtomicBoolean stopRequested) throws IOException {
-    this.conf = conf;
-    this.stopRequested = stopRequested;
-    this.replication =
-        conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
+  public Replication(final Server server, final FileSystem fs,
+      final Path logDir, final Path oldLogDir)
+  throws IOException, KeeperException {
+    this.server = server;
+    this.conf = this.server.getConfiguration();
+    this.replication = isReplication(this.conf);
     if (replication) {
-      this.zkHelper = new ReplicationZookeeperWrapper(
-        ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
-        this.replicating, hsi.getServerName());
+      this.zkHelper = new ReplicationZookeeper(server, this.replicating);
       this.replicationMaster = zkHelper.isReplicationMaster();
       this.replicationManager = this.replicationMaster ?
-        new ReplicationSourceManager(zkHelper, conf, stopRequested,
+        new ReplicationSourceManager(zkHelper, conf, this.server,
           fs, this.replicating, logDir, oldLogDir) : null;
     } else {
-      replicationManager = null;
-      zkHelper = null;
+      this.replicationManager = null;
+      this.zkHelper = null;
     }
   }
 
   /**
+   * @param c Configuration to look at
+   * @return True if replication is enabled.
+   */
+  public static boolean isReplication(final Configuration c) {
+    return c.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
+  }
+
+  /**
    * Join with the replication threads
    */
   public void join() {
@@ -92,7 +96,7 @@ public class Replication implements LogE
       if (this.replicationMaster) {
         this.replicationManager.join();
       }
-      this.zkHelper.deleteOwnRSZNode();
+        this.zkHelper.deleteOwnRSZNode();
     }
   }
 
@@ -117,8 +121,7 @@ public class Replication implements LogE
       if (this.replicationMaster) {
         this.replicationManager.init();
       } else {
-        this.replicationSink =
-            new ReplicationSink(this.conf, this.stopRequested);
+        this.replicationSink = new ReplicationSink(this.conf, this.server);
       }
     }
   }
@@ -128,12 +131,12 @@ public class Replication implements LogE
    * @return the manager if replication is enabled, else returns false
    */
   public ReplicationSourceManager getReplicationManager() {
-    return replicationManager;
+    return this.replicationManager;
   }
 
   @Override
   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
-                                       WALEdit logEdit) {
+      WALEdit logEdit) {
     NavigableMap<byte[], Integer> scopes =
         new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
     byte[] family;
@@ -150,13 +153,13 @@ public class Replication implements LogE
     }
   }
 
-  /**
-   * Add this class as a log entry visitor for HLog if replication is enabled
-   * @param hlog log that was add ourselves on
-   */
-  public void addLogEntryVisitor(HLog hlog) {
-    if (replication) {
-      hlog.addLogEntryVisitor(this);
-    }
+  @Override
+  public void logRolled(Path p) {
+    getReplicationManager().logRolled(p);
+  }
+
+  @Override
+  public void logRollRequested() {
+    // Not interested
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Tue Aug 31 23:51:44 2010
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.Stoppable;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -60,8 +61,8 @@ public class ReplicationSink {
   private final Configuration conf;
   // Pool used to replicated
   private final HTablePool pool;
-  // boolean coming from HRS to know when the process stops
-  private final AtomicBoolean stop;
+  // Chain to pull on when we want all to stop.
+  private final Stoppable stopper;
   private final ReplicationSinkMetrics metrics;
 
   /**
@@ -71,12 +72,12 @@ public class ReplicationSink {
    * @param stopper             boolean to tell this thread to stop
    * @throws IOException thrown when HDFS goes bad or bad file name
    */
-  public ReplicationSink(Configuration conf, AtomicBoolean stopper)
+  public ReplicationSink(Configuration conf, Stoppable stopper)
       throws IOException {
     this.conf = conf;
     this.pool = new HTablePool(this.conf,
         conf.getInt("replication.sink.htablepool.capacity", 10));
-    this.stop = stopper;
+    this.stopper = stopper;
     this.metrics = new ReplicationSinkMetrics();
   }
 
@@ -146,14 +147,14 @@ public class ReplicationSink {
       } else {
         // Should we log rejected edits in a file for replay?
         LOG.error("Unable to accept edit because", ex);
-        this.stop.set(true);
+        this.stopper.stop("Unable to accept edit because " + ex.getMessage());
         throw ex;
       }
     } catch (RuntimeException re) {
       if (re.getCause() instanceof TableNotFoundException) {
         LOG.warn("Losing edits because: ", re);
       } else {
-        this.stop.set(true);
+        this.stopper.stop("Replication stopped us because " + re.getMessage());
         throw re;
       }
     }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Aug 31 23:51:44 2010
@@ -19,6 +19,22 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -28,32 +44,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 
-import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * Class that handles the source of a replication stream.
  * Currently does not handle more than 1 slave
@@ -76,7 +77,7 @@ public class ReplicationSource extends T
   private HLog.Entry[] entriesArray;
   private HConnection conn;
   // Helper class for zookeeper
-  private ReplicationZookeeperWrapper zkHelper;
+  private ReplicationZookeeper zkHelper;
   private Configuration conf;
   // ratio of region servers to chose from a slave cluster
   private float ratio;
@@ -88,7 +89,7 @@ public class ReplicationSource extends T
   // The manager of all sources to which we ping back our progress
   private ReplicationSourceManager manager;
   // Should we stop everything?
-  private AtomicBoolean stop;
+  private Stoppable stopper;
   // List of chosen sinks (region servers)
   private List<HServerAddress> currentPeers;
   // How long should we sleep for each retry
@@ -139,11 +140,11 @@ public class ReplicationSource extends T
   public void init(final Configuration conf,
                    final FileSystem fs,
                    final ReplicationSourceManager manager,
-                   final AtomicBoolean stopper,
+                   final Stoppable stopper,
                    final AtomicBoolean replicating,
                    final String peerClusterZnode)
       throws IOException {
-    this.stop = stopper;
+    this.stopper = stopper;
     this.conf = conf;
     this.replicationQueueSizeCapacity =
         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
@@ -224,18 +225,18 @@ public class ReplicationSource extends T
   public void run() {
     connectToPeers();
     // We were stopped while looping to connect to sinks, just abort
-    if (this.stop.get()) {
+    if (this.stopper.isStopped()) {
       return;
     }
     // If this is recovered, the queue is already full and the first log
     // normally has a position (unless the RS failed between 2 logs)
     if (this.queueRecovered) {
-      this.position = this.zkHelper.getHLogRepPosition(
-          this.peerClusterZnode, this.queue.peek().getName());
+//      this.position = this.zkHelper.getHLogRepPosition(
+//          this.peerClusterZnode, this.queue.peek().getName());
     }
     int sleepMultiplier = 1;
     // Loop until we close down
-    while (!stop.get() && this.running) {
+    while (!stopper.isStopped() && this.running) {
       // Get a new path
       if (!getNextPath()) {
         if (sleepForRetries("No log to process", sleepMultiplier)) {
@@ -311,7 +312,7 @@ public class ReplicationSource extends T
       // If we didn't get anything to replicate, or if we hit a IOE,
       // wait a bit and retry.
       // But if we need to stop, don't bother sleeping
-      if (!stop.get() && (gotIOE || currentNbEntries == 0)) {
+      if (!stopper.isStopped() && (gotIOE || currentNbEntries == 0)) {
         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
           sleepMultiplier++;
         }
@@ -373,7 +374,7 @@ public class ReplicationSource extends T
 
   private void connectToPeers() {
     // Connect to peer cluster first, unless we have to stop
-    while (!this.stop.get() && this.currentPeers.size() == 0) {
+    while (!this.stopper.isStopped() && this.currentPeers.size() == 0) {
       try {
         chooseSinks();
         Thread.sleep(this.sleepForRetries);
@@ -517,7 +518,7 @@ public class ReplicationSource extends T
    */
   protected void shipEdits() {
     int sleepMultiplier = 1;
-    while (!stop.get()) {
+    while (!this.stopper.isStopped()) {
       try {
         HRegionInterface rrs = getRS();
         LOG.debug("Replicating " + currentNbEntries);
@@ -549,7 +550,7 @@ public class ReplicationSource extends T
                 chooseSinks();
               }
             }
-          } while (!stop.get() && down);
+          } while (!this.stopper.isStopped() && down);
         } catch (InterruptedException e) {
           LOG.debug("Interrupted while trying to contact the peer cluster");
         }
@@ -688,5 +689,4 @@ public class ReplicationSource extends T
       return Long.parseLong(parts[parts.length-1]);
     }
   }
-
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Tue Aug 31 23:51:44 2010
@@ -19,13 +19,13 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hbase.Stoppable;
 
 /**
  * Interface that defines a replication source
@@ -45,7 +45,7 @@ public interface ReplicationSourceInterf
   public void init(final Configuration conf,
                    final FileSystem fs,
                    final ReplicationSourceManager manager,
-                   final AtomicBoolean stopper,
+                   final Stoppable stopper,
                    final AtomicBoolean replicating,
                    final String peerClusterId) throws IOException;
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Tue Aug 31 23:51:44 2010
@@ -20,16 +20,6 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -39,6 +29,16 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
 /**
  * This class is responsible to manage all the replication
  * sources. There are two classes of sources:
@@ -50,8 +50,7 @@ import java.util.concurrent.atomic.Atomi
  * tries to grab a lock in order to transfer all the queues in a local
  * old source.
  */
-public class ReplicationSourceManager implements LogActionsListener {
-
+public class ReplicationSourceManager {
   private static final Log LOG =
       LogFactory.getLog(ReplicationSourceManager.class);
   // List of all the sources that read this RS's logs
@@ -61,9 +60,9 @@ public class ReplicationSourceManager im
   // Indicates if we are currently replicating
   private final AtomicBoolean replicating;
   // Helper for zookeeper
-  private final ReplicationZookeeperWrapper zkHelper;
-  // Indicates if the region server is closing
-  private final AtomicBoolean stopper;
+  private final ReplicationZookeeper zkHelper;
+  // All about stopping
+  private final Stoppable stopper;
   // All logs we are currently trackign
   private final SortedSet<String> hlogs;
   private final Configuration conf;
@@ -88,9 +87,9 @@ public class ReplicationSourceManager im
    * @param logDir the directory that contains all hlog directories of live RSs
    * @param oldLogDir the directory where old logs are archived
    */
-  public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
+  public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
                                   final Configuration conf,
-                                  final AtomicBoolean stopper,
+                                  final Stoppable stopper,
                                   final FileSystem fs,
                                   final AtomicBoolean replicating,
                                   final Path logDir,
@@ -146,7 +145,7 @@ public class ReplicationSourceManager im
       ReplicationSourceInterface src = addSource(id);
       src.startup();
     }
-    List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
+    List<String> currentReplicators = this.zkHelper.getListOfReplicators();
     synchronized (otherRegionServers) {
       LOG.info("Current list of replicators: " + currentReplicators
           + " other RSs: " + otherRegionServers);
@@ -198,7 +197,7 @@ public class ReplicationSourceManager im
    * @return a sorted set of hlog names
    */
   protected SortedSet<String> getHLogs() {
-    return new TreeSet(this.hlogs);
+    return new TreeSet<String>(this.hlogs);
   }
 
   /**
@@ -209,8 +208,7 @@ public class ReplicationSourceManager im
     return this.sources;
   }
 
-  @Override
-  public void logRolled(Path newLog) {
+  void logRolled(Path newLog) {
     if (this.sources.size() > 0) {
       this.zkHelper.addLogToList(newLog.getName(),
           this.sources.get(0).getPeerClusterZnode());
@@ -229,7 +227,7 @@ public class ReplicationSourceManager im
    * Get the ZK help of this manager
    * @return the helper
    */
-  public ReplicationZookeeperWrapper getRepZkWrapper() {
+  public ReplicationZookeeper getRepZkWrapper() {
     return zkHelper;
   }
 
@@ -248,11 +246,12 @@ public class ReplicationSourceManager im
       final Configuration conf,
       final FileSystem fs,
       final ReplicationSourceManager manager,
-      final AtomicBoolean stopper,
+      final Stoppable stopper,
       final AtomicBoolean replicating,
       final String peerClusterId) throws IOException {
     ReplicationSourceInterface src;
     try {
+      @SuppressWarnings("rawtypes")
       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
           ReplicationSource.class.getCanonicalName()));
       src = (ReplicationSourceInterface) c.newInstance();
@@ -276,7 +275,7 @@ public class ReplicationSourceManager im
    */
   public void transferQueues(String rsZnode) {
     // We try to lock that rs' queue directory
-    if (this.stopper.get()) {
+    if (this.stopper.isStopped()) {
       LOG.info("Not transferring queue since we are shutting down");
       return;
     }
@@ -372,5 +371,4 @@ public class ReplicationSourceManager im
   public FileSystem getFs() {
     return this.fs;
   }
-
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Tue Aug 31 23:51:44 2010
@@ -18,6 +18,17 @@
 
 package org.apache.hadoop.hbase.thrift;
 
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -35,7 +46,6 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -73,17 +83,6 @@ import org.apache.thrift.transport.TServ
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransportFactory;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
 /**
  * ThriftServer - this class starts up a Thrift server which implements the
  * Hbase API specified in the Hbase.thrift IDL file.
@@ -108,7 +107,6 @@ public class ThriftServer {
       protected Map<String, HTable> initialValue() {
         return new TreeMap<String, HTable>();
       }
-
     };
 
     /**
@@ -183,11 +181,16 @@ public class ThriftServer {
 
     /**
      * Constructs an HBaseHandler object.
-     *
-     * @throws MasterNotRunningException
+     * @throws IOException 
      */
-    HBaseHandler() throws MasterNotRunningException {
-      conf = HBaseConfiguration.create();
+    HBaseHandler()
+    throws IOException {
+      this(HBaseConfiguration.create());
+    }
+
+    HBaseHandler(final Configuration c)
+    throws IOException {
+      this.conf = c;
       admin = new HBaseAdmin(conf);
       scannerMap = new HashMap<Integer, ResultScanner>();
     }
@@ -210,7 +213,7 @@ public class ThriftServer {
 
     public boolean isTableEnabled(final byte[] tableName) throws IOError {
       try {
-        return HTable.isTableEnabled(tableName);
+        return HTable.isTableEnabled(this.conf, tableName);
       } catch (IOException e) {
         throw new IOError(e.getMessage());
       }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Tue Aug 31 23:51:44 2010
@@ -394,7 +394,7 @@ public class FSUtils {
   public static Map<String, Integer> getTableFragmentation(
     final HMaster master)
   throws IOException {
-    Path path = master.getRootDir();
+    Path path = getRootDir(master.getConfiguration());
     // since HMaster.getFileSystem() is package private
     FileSystem fs = path.getFileSystem(master.getConfiguration());
     return getTableFragmentation(fs, path);

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseConfTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseConfTool.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseConfTool.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseConfTool.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+/**
+ * Tool that prints out a configuration.
+ * Pass the configuration key on the command-line.
+ */
+public class HBaseConfTool {
+  public static void main(String args[]) {
+    if (args.length < 1) {
+      System.err.println("Usage: HBaseConfTool <CONFIGURATION_KEY>");
+      System.exit(1);
+      return;
+    }
+
+    Configuration conf = HBaseConfiguration.create();
+    System.out.println(conf.get(args[0]));
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HMerge.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HMerge.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HMerge.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,429 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+/**
+ * A non-instantiable class that has a static method capable of compacting
+ * a table by merging adjacent regions.
+ */
+class HMerge {
+  static final Log LOG = LogFactory.getLog(HMerge.class);
+  static final Random rand = new Random();
+
+  /*
+   * Not instantiable
+   */
+  private HMerge() {
+    super();
+  }
+
+  /**
+   * Scans the table and merges two adjacent regions if they are small. This
+   * only happens when a lot of rows are deleted.
+   *
+   * When merging the META region, the HBase instance must be offline.
+   * When merging a normal table, the HBase instance must be online, but the
+   * table must be disabled.
+   *
+   * @param conf        - configuration object for HBase
+   * @param fs          - FileSystem where regions reside
+   * @param tableName   - Table to be compacted
+   * @throws IOException
+   */
+  public static void merge(Configuration conf, FileSystem fs,
+    final byte [] tableName)
+  throws IOException {
+    merge(conf, fs, tableName, true);
+  }
+
+  /**
+   * Scans the table and merges two adjacent regions if they are small. This
+   * only happens when a lot of rows are deleted.
+   *
+   * When merging the META region, the HBase instance must be offline.
+   * When merging a normal table, the HBase instance must be online, but the
+   * table must be disabled.
+   *
+   * @param conf        - configuration object for HBase
+   * @param fs          - FileSystem where regions reside
+   * @param tableName   - Table to be compacted
+   * @param testMasterRunning True if we are to verify master is down before
+   * running merge
+   * @throws IOException
+   */
+  public static void merge(Configuration conf, FileSystem fs,
+    final byte [] tableName, final boolean testMasterRunning)
+  throws IOException {
+    boolean masterIsRunning = false;
+    if (testMasterRunning) {
+      HConnection connection = HConnectionManager.getConnection(conf);
+      masterIsRunning = connection.isMasterRunning();
+    }
+    HConnectionManager.deleteConnectionInfo(conf, false);
+    if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
+      if (masterIsRunning) {
+        throw new IllegalStateException(
+            "Can not compact META table if instance is on-line");
+      }
+      new OfflineMerger(conf, fs).process();
+    } else {
+      if(!masterIsRunning) {
+        throw new IllegalStateException(
+            "HBase instance must be running to merge a normal table");
+      }
+      HBaseAdmin admin = new HBaseAdmin(conf);
+      if (!admin.isTableDisabled(tableName)) {
+        throw new TableNotDisabledException(tableName);
+      }
+      new OnlineMerger(conf, fs, tableName).process();
+    }
+  }
+
+  private static abstract class Merger {
+    protected final Configuration conf;
+    protected final FileSystem fs;
+    protected final Path tabledir;
+    protected final HLog hlog;
+    private final long maxFilesize;
+
+
+    protected Merger(Configuration conf, FileSystem fs,
+      final byte [] tableName)
+    throws IOException {
+      this.conf = conf;
+      this.fs = fs;
+      this.maxFilesize = conf.getLong("hbase.hregion.max.filesize",
+          HConstants.DEFAULT_MAX_FILE_SIZE);
+
+      this.tabledir = new Path(
+          fs.makeQualified(new Path(conf.get(HConstants.HBASE_DIR))),
+          Bytes.toString(tableName)
+      );
+      Path logdir = new Path(tabledir, "merge_" + System.currentTimeMillis() +
+          HConstants.HREGION_LOGDIR_NAME);
+      Path oldLogDir = new Path(tabledir, HConstants.HREGION_OLDLOGDIR_NAME);
+      this.hlog = new HLog(fs, logdir, oldLogDir, conf);
+    }
+
+    void process() throws IOException {
+      try {
+        for(HRegionInfo[] regionsToMerge = next();
+            regionsToMerge != null;
+            regionsToMerge = next()) {
+          if (!merge(regionsToMerge)) {
+            return;
+          }
+        }
+      } finally {
+        try {
+          hlog.closeAndDelete();
+
+        } catch(IOException e) {
+          LOG.error(e);
+        }
+      }
+    }
+
+    protected boolean merge(final HRegionInfo[] info) throws IOException {
+      if(info.length < 2) {
+        LOG.info("only one region - nothing to merge");
+        return false;
+      }
+
+      HRegion currentRegion = null;
+      long currentSize = 0;
+      HRegion nextRegion = null;
+      long nextSize = 0;
+      for (int i = 0; i < info.length - 1; i++) {
+        if (currentRegion == null) {
+          currentRegion =
+            HRegion.newHRegion(tabledir, hlog, fs, conf, info[i], null);
+          currentRegion.initialize();
+          currentSize = currentRegion.getLargestHStoreSize();
+        }
+        nextRegion =
+          HRegion.newHRegion(tabledir, hlog, fs, conf, info[i + 1], null);
+        nextRegion.initialize();
+        nextSize = nextRegion.getLargestHStoreSize();
+
+        if ((currentSize + nextSize) <= (maxFilesize / 2)) {
+          // We merge two adjacent regions if their total size is less than
+          // one half of the desired maximum size
+          LOG.info("merging regions " + Bytes.toString(currentRegion.getRegionName())
+              + " and " + Bytes.toString(nextRegion.getRegionName()));
+          HRegion mergedRegion =
+            HRegion.mergeAdjacent(currentRegion, nextRegion);
+          updateMeta(currentRegion.getRegionName(), nextRegion.getRegionName(),
+              mergedRegion);
+          break;
+        }
+        LOG.info("not merging regions " + Bytes.toString(currentRegion.getRegionName())
+            + " and " + Bytes.toString(nextRegion.getRegionName()));
+        currentRegion.close();
+        currentRegion = nextRegion;
+        currentSize = nextSize;
+      }
+      if(currentRegion != null) {
+        currentRegion.close();
+      }
+      return true;
+    }
+
+    protected abstract HRegionInfo[] next() throws IOException;
+
+    protected abstract void updateMeta(final byte [] oldRegion1,
+      final byte [] oldRegion2, HRegion newRegion)
+    throws IOException;
+
+  }
+
+  /** Instantiated to compact a normal user table */
+  private static class OnlineMerger extends Merger {
+    private final byte [] tableName;
+    private final HTable table;
+    private final ResultScanner metaScanner;
+    private HRegionInfo latestRegion;
+
+    OnlineMerger(Configuration conf, FileSystem fs,
+      final byte [] tableName)
+    throws IOException {
+      super(conf, fs, tableName);
+      this.tableName = tableName;
+      this.table = new HTable(conf, HConstants.META_TABLE_NAME);
+      this.metaScanner = table.getScanner(HConstants.CATALOG_FAMILY,
+          HConstants.REGIONINFO_QUALIFIER);
+      this.latestRegion = null;
+    }
+
+    private HRegionInfo nextRegion() throws IOException {
+      try {
+        Result results = getMetaRow();
+        if (results == null) {
+          return null;
+        }
+        byte[] regionInfoValue = results.getValue(HConstants.CATALOG_FAMILY,
+            HConstants.REGIONINFO_QUALIFIER);
+        if (regionInfoValue == null || regionInfoValue.length == 0) {
+          throw new NoSuchElementException("meta region entry missing " +
+              Bytes.toString(HConstants.CATALOG_FAMILY) + ":" +
+              Bytes.toString(HConstants.REGIONINFO_QUALIFIER));
+        }
+        HRegionInfo region = Writables.getHRegionInfo(regionInfoValue);
+        if (!Bytes.equals(region.getTableDesc().getName(), this.tableName)) {
+          return null;
+        }
+        return region;
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        LOG.error("meta scanner error", e);
+        metaScanner.close();
+        throw e;
+      }
+    }
+
+    /*
+     * Check current row has a HRegionInfo.  Skip to next row if HRI is empty.
+     * @return A Map of the row content else null if we are off the end.
+     * @throws IOException
+     */
+    private Result getMetaRow() throws IOException {
+      Result currentRow = metaScanner.next();
+      boolean foundResult = false;
+      while (currentRow != null) {
+        LOG.info("Row: <" + Bytes.toString(currentRow.getRow()) + ">");
+        byte[] regionInfoValue = currentRow.getValue(HConstants.CATALOG_FAMILY,
+            HConstants.REGIONINFO_QUALIFIER);
+        if (regionInfoValue == null || regionInfoValue.length == 0) {
+          currentRow = metaScanner.next();
+          continue;
+        }
+        foundResult = true;
+        break;
+      }
+      return foundResult ? currentRow : null;
+    }
+
+    @Override
+    protected HRegionInfo[] next() throws IOException {
+      List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
+      if(latestRegion == null) {
+        latestRegion = nextRegion();
+      }
+      if(latestRegion != null) {
+        regions.add(latestRegion);
+      }
+      latestRegion = nextRegion();
+      if(latestRegion != null) {
+        regions.add(latestRegion);
+      }
+      return regions.toArray(new HRegionInfo[regions.size()]);
+    }
+
+    @Override
+    protected void updateMeta(final byte [] oldRegion1,
+        final byte [] oldRegion2,
+      HRegion newRegion)
+    throws IOException {
+      byte[][] regionsToDelete = {oldRegion1, oldRegion2};
+      for (int r = 0; r < regionsToDelete.length; r++) {
+        if(Bytes.equals(regionsToDelete[r], latestRegion.getRegionName())) {
+          latestRegion = null;
+        }
+        Delete delete = new Delete(regionsToDelete[r]);
+        table.delete(delete);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("updated columns in row: " + Bytes.toString(regionsToDelete[r]));
+        }
+      }
+      newRegion.getRegionInfo().setOffline(true);
+
+      Put put = new Put(newRegion.getRegionName());
+      put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+        Writables.getBytes(newRegion.getRegionInfo()));
+      table.put(put);
+
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("updated columns in row: "
+            + Bytes.toString(newRegion.getRegionName()));
+      }
+    }
+  }
+
+  /** Instantiated to compact the meta region */
+  private static class OfflineMerger extends Merger {
+    private final List<HRegionInfo> metaRegions = new ArrayList<HRegionInfo>();
+    private final HRegion root;
+
+    OfflineMerger(Configuration conf, FileSystem fs)
+        throws IOException {
+      super(conf, fs, HConstants.META_TABLE_NAME);
+
+      Path rootTableDir = HTableDescriptor.getTableDir(
+          fs.makeQualified(new Path(conf.get(HConstants.HBASE_DIR))),
+          HConstants.ROOT_TABLE_NAME);
+
+      // Scan root region to find all the meta regions
+
+      root = HRegion.newHRegion(rootTableDir, hlog, fs, conf,
+          HRegionInfo.ROOT_REGIONINFO, null);
+      root.initialize();
+
+      Scan scan = new Scan();
+      scan.addColumn(HConstants.CATALOG_FAMILY,
+          HConstants.REGIONINFO_QUALIFIER);
+      InternalScanner rootScanner =
+        root.getScanner(scan);
+
+      try {
+        List<KeyValue> results = new ArrayList<KeyValue>();
+        while(rootScanner.next(results)) {
+          for(KeyValue kv: results) {
+            HRegionInfo info = Writables.getHRegionInfoOrNull(kv.getValue());
+            if (info != null) {
+              metaRegions.add(info);
+            }
+          }
+        }
+      } finally {
+        rootScanner.close();
+        try {
+          root.close();
+
+        } catch(IOException e) {
+          LOG.error(e);
+        }
+      }
+    }
+
+    @Override
+    protected HRegionInfo[] next() {
+      HRegionInfo[] results = null;
+      if (metaRegions.size() > 0) {
+        results = metaRegions.toArray(new HRegionInfo[metaRegions.size()]);
+        metaRegions.clear();
+      }
+      return results;
+    }
+
+    @Override
+    protected void updateMeta(final byte [] oldRegion1,
+      final byte [] oldRegion2, HRegion newRegion)
+    throws IOException {
+      byte[][] regionsToDelete = {oldRegion1, oldRegion2};
+      for(int r = 0; r < regionsToDelete.length; r++) {
+        Delete delete = new Delete(regionsToDelete[r]);
+        delete.deleteColumns(HConstants.CATALOG_FAMILY,
+            HConstants.REGIONINFO_QUALIFIER);
+        delete.deleteColumns(HConstants.CATALOG_FAMILY,
+            HConstants.SERVER_QUALIFIER);
+        delete.deleteColumns(HConstants.CATALOG_FAMILY,
+            HConstants.STARTCODE_QUALIFIER);
+        delete.deleteColumns(HConstants.CATALOG_FAMILY,
+            HConstants.SPLITA_QUALIFIER);
+        delete.deleteColumns(HConstants.CATALOG_FAMILY,
+            HConstants.SPLITB_QUALIFIER);
+        root.delete(delete, null, true);
+
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("updated columns in row: " + Bytes.toString(regionsToDelete[r]));
+        }
+      }
+      HRegionInfo newInfo = newRegion.getRegionInfo();
+      newInfo.setOffline(true);
+      Put put = new Put(newRegion.getRegionName());
+      put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+          Writables.getBytes(newInfo));
+      root.put(put);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("updated columns in row: " + Bytes.toString(newRegion.getRegionName()));
+      }
+    }
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java Tue Aug 31 23:51:44 2010
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -41,7 +42,7 @@ public class JVMClusterUtil {
     private final HRegionServer regionServer;
 
     public RegionServerThread(final HRegionServer r, final int index) {
-      super(r, "RegionServer:" + index);
+      super(r, "RegionServer:" + index + ";" + r.getServerName());
       this.regionServer = r;
     }
 
@@ -60,7 +61,7 @@ public class JVMClusterUtil {
       // cases, we'll jump out of the run without setting online flag.  Check
       // stopRequested so we don't wait here a flag that will never be flipped.
       while (!this.regionServer.isOnline() &&
-          !this.regionServer.isStopRequested()) {
+          !this.regionServer.isStopped()) {
         try {
           Thread.sleep(1000);
         } catch (InterruptedException e) {
@@ -80,17 +81,22 @@ public class JVMClusterUtil {
    * @return Region server added.
    */
   public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c,
-    final Class<? extends HRegionServer> hrsc, final int index)
+      final Class<? extends HRegionServer> hrsc, final int index)
   throws IOException {
-      HRegionServer server;
-      try {
-        server = hrsc.getConstructor(Configuration.class).newInstance(c);
-      } catch (Exception e) {
-        IOException ioe = new IOException();
-        ioe.initCause(e);
-        throw ioe;
-      }
-      return new JVMClusterUtil.RegionServerThread(server, index);
+    HRegionServer server;
+    try {
+      server = hrsc.getConstructor(Configuration.class).newInstance(c);
+    } catch (InvocationTargetException ite) {
+      Throwable target = ite.getTargetException();
+      throw new RuntimeException("Failed construction of RegionServer: " +
+        hrsc.toString() + ((target.getCause() != null)?
+          target.getCause().getMessage(): ""), target);
+    } catch (Exception e) {
+      IOException ioe = new IOException();
+      ioe.initCause(e);
+      throw ioe;
+    }
+    return new JVMClusterUtil.RegionServerThread(server, index);
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Merge.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Merge.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Merge.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Merge.java Tue Aug 31 23:51:44 2010
@@ -277,9 +277,9 @@ public class Merge extends Configured im
     }
     HRegion merged = null;
     HLog log = utils.getLog();
-    HRegion r1 = HRegion.openHRegion(info1, this.rootdir, log, getConf());
+    HRegion r1 = HRegion.openHRegion(info1, log, getConf());
     try {
-      HRegion r2 = HRegion.openHRegion(info2, this.rootdir, log, getConf());
+      HRegion r2 = HRegion.openHRegion(info2, log, getConf());
       try {
         merged = HRegion.merge(r1, r2);
       } finally {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java Tue Aug 31 23:51:44 2010
@@ -103,7 +103,7 @@ public class MetaUtils {
           HConstants.HREGION_LOGDIR_NAME + "_" + System.currentTimeMillis());
       Path oldLogDir = new Path(this.fs.getHomeDirectory(),
           HConstants.HREGION_OLDLOGDIR_NAME);
-      this.log = new HLog(this.fs, logdir, oldLogDir, this.conf, null);
+      this.log = new HLog(this.fs, logdir, oldLogDir, this.conf);
     }
     return this.log;
   }
@@ -266,15 +266,14 @@ public class MetaUtils {
     if (this.rootRegion != null) {
       return this.rootRegion;
     }
-    this.rootRegion = HRegion.openHRegion(HRegionInfo.ROOT_REGIONINFO,
-      this.rootdir, getLog(), this.conf);
+    this.rootRegion = HRegion.openHRegion(HRegionInfo.ROOT_REGIONINFO, getLog(),
+      this.conf);
     this.rootRegion.compactStores();
     return this.rootRegion;
   }
 
   private HRegion openMetaRegion(HRegionInfo metaInfo) throws IOException {
-    HRegion meta =
-      HRegion.openHRegion(metaInfo, this.rootdir, getLog(), this.conf);
+    HRegion meta = HRegion.openHRegion(metaInfo, getLog(), this.conf);
     meta.compactStores();
     return meta;
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Sleeper.java Tue Aug 31 23:51:44 2010
@@ -21,8 +21,7 @@ package org.apache.hadoop.hbase.util;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hbase.Stoppable;
 
 /**
  * Sleeper for current thread.
@@ -33,7 +32,7 @@ import java.util.concurrent.atomic.Atomi
 public class Sleeper {
   private final Log LOG = LogFactory.getLog(this.getClass().getName());
   private final int period;
-  private final AtomicBoolean stop;
+  private final Stoppable stopper;
   private static final long MINIMAL_DELTA_FOR_LOGGING = 10000;
 
   private final Object sleepLock = new Object();
@@ -41,11 +40,12 @@ public class Sleeper {
 
   /**
    * @param sleep sleep time in milliseconds
-   * @param stop flag for when we stop
+   * @param stopper When {@link Stoppable#isStopped()} is true, this thread will
+   * cleanup and exit cleanly.
    */
-  public Sleeper(final int sleep, final AtomicBoolean stop) {
+  public Sleeper(final int sleep, final Stoppable stopper) {
     this.period = sleep;
-    this.stop = stop;
+    this.stopper = stopper;
   }
 
   /**
@@ -72,7 +72,7 @@ public class Sleeper {
    * will be docked current time minus passed <code>startTime<code>.
    */
   public void sleep(final long startTime) {
-    if (this.stop.get()) {
+    if (this.stopper.isStopped()) {
       return;
     }
     long now = System.currentTimeMillis();
@@ -101,7 +101,7 @@ public class Sleeper {
       } catch(InterruptedException iex) {
         // We we interrupted because we're meant to stop?  If not, just
         // continue ignoring the interruption
-        if (this.stop.get()) {
+        if (this.stopper.isStopped()) {
           return;
         }
       }
@@ -111,4 +111,4 @@ public class Sleeper {
     }
     triggerWake = false;
   }
-}
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ClusterStatusTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ClusterStatusTracker.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ClusterStatusTracker.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ClusterStatusTracker.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracker on cluster settings up in zookeeper.
+ * This is not related to {@link ClusterStatus}.  That class is a data structure
+ * that holds snapshot of current view on cluster.  This class is about tracking
+ * cluster attributes up in zookeeper.
+ *
+ */
+public class ClusterStatusTracker extends ZooKeeperNodeTracker {
+  private static final Log LOG = LogFactory.getLog(ClusterStatusTracker.class);
+
+  /**
+   * Creates a cluster status tracker.
+   *
+   * <p>After construction, use {@link #start} to kick off tracking.
+   *
+   * @param watcher
+   * @param abortable
+   */
+  public ClusterStatusTracker(ZooKeeperWatcher watcher, Abortable abortable) {
+    super(watcher, watcher.clusterStateZNode, abortable);
+  }
+
+  /**
+   * Checks if cluster is up.
+   * @return true if root region location is available, false if not
+   */
+  public boolean isClusterUp() {
+    return super.getData() != null;
+  }
+
+  /**
+   * Sets the cluster as up.
+   * @throws KeeperException unexpected zk exception
+   */
+  public void setClusterUp()
+  throws KeeperException {
+    byte [] upData = Bytes.toBytes(new java.util.Date().toString());
+    try {
+      ZKUtil.createAndWatch(watcher, watcher.clusterStateZNode, upData);
+    } catch(KeeperException.NodeExistsException nee) {
+      ZKUtil.setData(watcher, watcher.clusterStateZNode, upData);
+    }
+  }
+
+  /**
+   * Sets the cluster as down by deleting the znode.
+   * @throws KeeperException unexpected zk exception
+   */
+  public void setClusterDown()
+  throws KeeperException {
+    try {
+      ZKUtil.deleteNode(watcher, watcher.clusterStateZNode);
+    } catch(KeeperException.NoNodeException nne) {
+      LOG.warn("Attempted to set cluster as down but already down, cluster " +
+          "state node (" + watcher.clusterStateZNode + ") not found");
+    }
+  }
+}
\ No newline at end of file



Mime
View raw message