hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject svn commit: r1449950 [23/35] - in /hbase/trunk: ./ hbase-client/ hbase-client/src/ hbase-client/src/main/ hbase-client/src/main/java/ hbase-client/src/main/java/org/ hbase-client/src/main/java/org/apache/ hbase-client/src/main/java/org/apache/hadoop/ h...
Date Mon, 25 Feb 2013 22:50:29 GMT
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,1123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class serves as a helper for all things related to zookeeper in
+ * replication.
+ * <p/>
+ * The layout looks something like this under zookeeper.znode.parent for the
+ * master cluster:
+ * <p/>
+ *
+ * <pre>
+ * replication/
+ *  state      {contains true or false}
+ *  clusterId  {contains a byte}
+ *  peers/
+ *    1/   {contains a full cluster address}
+ *      peer-state  {contains ENABLED or DISABLED}
+ *    2/
+ *    ...
+ *  rs/ {lists all RS that replicate}
+ *    startcode1/ {lists all peer clusters}
+ *      1/ {lists hlogs to process}
+ *        10.10.1.76%3A53488.123456789 {contains nothing or a position}
+ *        10.10.1.76%3A53488.123456790
+ *        ...
+ *      2/
+ *      ...
+ *    startcode2/
+ *    ...
+ * </pre>
+ */
+@InterfaceAudience.Private
+public class ReplicationZookeeper implements Closeable {
+  private static final Log LOG =
+    LogFactory.getLog(ReplicationZookeeper.class);
+  // Name of znode we use to lock when failover
+  private final static String RS_LOCK_ZNODE = "lock";
+
+  // Our handle on zookeeper
+  private final ZooKeeperWatcher zookeeper;
+  // Map of peer clusters keyed by their id
+  private Map<String, ReplicationPeer> peerClusters;
+  // Path to the root replication znode
+  private String replicationZNode;
+  // Path to the peer clusters znode
+  private String peersZNode;
+  // Path to the znode that contains all RS that replicates
+  private String rsZNode;
+  // Path to this region server's name under rsZNode
+  private String rsServerNameZnode;
+  // Name node if the replicationState znode
+  private String replicationStateNodeName;
+  // Name of zk node which stores peer state. The peer-state znode is under a
+  // peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
+  private String peerStateNodeName;
+  private final Configuration conf;
+  // The key to our own cluster
+  private String ourClusterKey;
+  // Abortable
+  private Abortable abortable;
+  private final ReplicationStateInterface replicationState;
+
+  /**
+   * ZNode content if enabled state.
+   */
+  // Public so it can be seen by test code.
+  public static final byte[] ENABLED_ZNODE_BYTES =
+      toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
+
+  /**
+   * ZNode content if disabled state.
+   */
+  static final byte[] DISABLED_ZNODE_BYTES =
+      toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+
+  /**
+   * Constructor used by clients of replication (like master and HBase clients)
+   * @param conf  conf to use
+   * @param zk    zk connection to use
+   * @throws IOException
+   */
+  public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
+      final ZooKeeperWatcher zk) throws KeeperException {
+    this.conf = conf;
+    this.zookeeper = zk;
+    setZNodes(abortable);
+    this.replicationState =
+        new ReplicationStateImpl(this.zookeeper, getRepStateNode(), abortable, new AtomicBoolean());
+  }
+
+  /**
+   * Constructor used by region servers, connects to the peer cluster right away.
+   *
+   * @param server
+   * @param replicating    atomic boolean to start/stop replication
+   * @throws IOException
+   * @throws KeeperException 
+   */
+  public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
+  throws IOException, KeeperException {
+    this.abortable = server;
+    this.zookeeper = server.getZooKeeper();
+    this.conf = server.getConfiguration();
+    setZNodes(server);
+
+    this.replicationState =
+        new ReplicationStateImpl(this.zookeeper, getRepStateNode(), server, replicating);
+    this.peerClusters = new HashMap<String, ReplicationPeer>();
+    ZKUtil.createWithParents(this.zookeeper,
+        ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
+    this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
+    ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
+    connectExistingPeers();
+  }
+
+  private void setZNodes(Abortable abortable) throws KeeperException {
+    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+    String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
+    this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
+    this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state");
+    String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
+    this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
+    this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
+    this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
+    ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+    this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
+    ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
+  }
+
+  private void connectExistingPeers() throws IOException, KeeperException {
+    List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+    if (znodes != null) {
+      for (String z : znodes) {
+        connectToPeer(z);
+      }
+    }
+  }
+
+  /**
+   * List this cluster's peers' IDs
+   * @return list of all peers' identifiers
+   */
+  public List<String> listPeersIdsAndWatch() {
+    List<String> ids = null;
+    try {
+      ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Cannot get the list of peers ", e);
+    }
+    return ids;
+  }
+
+  /**
+   * Map of this cluster's peers for display.
+   * @return A map of peer ids to peer cluster keys
+   */
+  public Map<String,String> listPeers() {
+    Map<String,String> peers = new TreeMap<String,String>();
+    List<String> ids = null;
+    try {
+      ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+      for (String id : ids) {
+        byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+        String clusterKey = null;
+        try {
+          clusterKey = parsePeerFrom(bytes);
+        } catch (DeserializationException de) {
+          LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
+          continue;
+        }
+        peers.put(id, clusterKey);
+      }
+    } catch (KeeperException e) {
+      this.abortable.abort("Cannot get the list of peers ", e);
+    }
+    return peers;
+  }
+
+  /**
+   * Returns all region servers from given peer
+   *
+   * @param peerClusterId (byte) the cluster to interrogate
+   * @return addresses of all region servers
+   */
+  public List<ServerName> getSlavesAddresses(String peerClusterId) {
+    if (this.peerClusters.size() == 0) {
+      return Collections.emptyList();
+    }
+    ReplicationPeer peer = this.peerClusters.get(peerClusterId);
+    if (peer == null) {
+      return Collections.emptyList();
+    }
+    
+    List<ServerName> addresses;
+    try {
+      addresses = fetchSlavesAddresses(peer.getZkw());
+    } catch (KeeperException ke) {
+      reconnectPeer(ke, peer);
+      addresses = Collections.emptyList();
+    }
+    peer.setRegionServers(addresses);
+    return peer.getRegionServers();
+  }
+
+  /**
+   * Get the list of all the region servers from the specified peer
+   * @param zkw zk connection to use
+   * @return list of region server addresses or an empty list if the slave
+   * is unavailable
+   */
+  private List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
+    throws KeeperException {
+    return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode);
+  }
+
+  /**
+   * Lists the children of the specified znode, retrieving the data of each
+   * child as a server address.
+   *
+   * Used to list the currently online regionservers and their addresses.
+   *
+   * Sets no watches at all, this method is best effort.
+   *
+   * Returns an empty list if the node has no children.  Returns null if the
+   * parent node itself does not exist.
+   *
+   * @param zkw zookeeper reference
+   * @param znode node to get children of as addresses
+   * @return list of data of children of specified znode, empty if no children,
+   *         null if parent does not exist
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static List<ServerName> listChildrenAndGetAsServerNames(
+      ZooKeeperWatcher zkw, String znode)
+  throws KeeperException {
+    List<String> children = ZKUtil.listChildrenNoWatch(zkw, znode);
+    if(children == null) {
+      return Collections.emptyList();
+    }
+    List<ServerName> addresses = new ArrayList<ServerName>(children.size());
+    for (String child : children) {
+      addresses.add(ServerName.parseServerName(child));
+    }
+    return addresses;
+  }
+
+  /**
+   * This method connects this cluster to another one and registers it
+   * in this region server's replication znode
+   * @param peerId id of the peer cluster
+   * @throws KeeperException 
+   */
+  public boolean connectToPeer(String peerId)
+      throws IOException, KeeperException {
+    if (peerClusters == null) {
+      return false;
+    }
+    if (this.peerClusters.containsKey(peerId)) {
+      return false;
+    }
+    ReplicationPeer peer = getPeer(peerId);
+    if (peer == null) {
+      return false;
+    }
+    this.peerClusters.put(peerId, peer);
+    ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
+        this.rsServerNameZnode, peerId));
+    LOG.info("Added new peer cluster " + peer.getClusterKey());
+    return true;
+  }
+
+  /**
+   * Helper method to connect to a peer
+   * @param peerId peer's identifier
+   * @return object representing the peer
+   * @throws IOException
+   * @throws KeeperException
+   */
+  public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
+    String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
+    byte [] data = ZKUtil.getData(this.zookeeper, znode);
+    String otherClusterKey = "";
+    try {
+      otherClusterKey = parsePeerFrom(data);
+    } catch (DeserializationException e) {
+      LOG.warn("Failed parse of cluster key from peerId=" + peerId
+          + ", specifically the content from the following znode: " + znode);
+    }
+    if (this.ourClusterKey.equals(otherClusterKey)) {
+      LOG.debug("Not connecting to " + peerId + " because it's us");
+      return null;
+    }
+    // Construct the connection to the new peer
+    Configuration otherConf = new Configuration(this.conf);
+    try {
+      ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
+    } catch (IOException e) {
+      LOG.error("Can't get peer because:", e);
+      return null;
+    }
+
+    ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
+        otherClusterKey);
+    peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+    return peer;
+  }
+
+  /**
+   * Remove the peer from zookeeper. which will trigger the watchers on every
+   * region server and close their sources
+   * @param id
+   * @throws IllegalArgumentException Thrown when the peer doesn't exist
+   */
+  public void removePeer(String id) throws IOException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("Cannot remove inexisting peer");
+      }
+      ZKUtil.deleteNodeRecursively(this.zookeeper,
+          ZKUtil.joinZNode(this.peersZNode, id));
+    } catch (KeeperException e) {
+      throw new IOException("Unable to remove a peer", e);
+    }
+  }
+
+  /**
+   * Add a new peer to this cluster
+   * @param id peer's identifier
+   * @param clusterKey ZK ensemble's addresses, client port and root znode
+   * @throws IllegalArgumentException Thrown when the peer doesn't exist
+   * @throws IllegalStateException Thrown when a peer already exists, since
+   *         multi-slave isn't supported yet.
+   */
+  public void addPeer(String id, String clusterKey) throws IOException {
+    try {
+      if (peerExists(id)) {
+        throw new IllegalArgumentException("Cannot add existing peer");
+      }
+      ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+      ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
+        toByteArray(clusterKey));
+      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+      // peer-state znode. This happens while adding a peer.
+      // The peer state data is set as "ENABLED" by default.
+      ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
+        ENABLED_ZNODE_BYTES);
+      // A peer is enabled by default
+    } catch (KeeperException e) {
+      throw new IOException("Unable to add peer", e);
+    }
+  }
+
+  /**
+   * @param clusterKey
+   * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix
+   *         prepended suitable for use as content of a this.peersZNode; i.e.
+   *         the content of PEER_ID znode under /hbase/replication/peers/PEER_ID
+   */
+  static byte[] toByteArray(final String clusterKey) {
+    byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
+        .toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  /**
+   * @param state
+   * @return Serialized protobuf of <code>state</code> with pb magic prefix
+   *         prepended suitable for use as content of either the cluster state
+   *         znode -- whether or not we should be replicating kept in
+   *         /hbase/replication/state -- or as content of a peer-state znode
+   *         under a peer cluster id as in
+   *         /hbase/replication/peers/PEER_ID/peer-state.
+   */
+  static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
+    byte[] bytes = ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build()
+        .toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  /**
+   * @param position
+   * @return Serialized protobuf of <code>position</code> with pb magic prefix
+   *         prepended suitable for use as content of an hlog position in a
+   *         replication queue.
+   */
+  static byte[] toByteArray(
+      final long position) {
+    byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
+        .build().toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  /**
+   * @param lockOwner
+   * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix
+   *         prepended suitable for use as content of an replication lock during
+   *         region server fail over.
+   */
+  static byte[] lockToByteArray(
+      final String lockOwner) {
+    byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
+        .toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+  /**
+   * @param bytes Content of a peer znode.
+   * @return ClusterKey parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
+    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer
+          .newBuilder();
+      ZooKeeperProtos.ReplicationPeer peer;
+      try {
+        peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      } catch (InvalidProtocolBufferException e) {
+        throw new DeserializationException(e);
+      }
+      return peer.getClusterkey();
+    } else {
+      if (bytes.length > 0) {
+        return Bytes.toString(bytes);
+      }
+      return "";
+    }
+  }
+
+  /**
+   * @param bytes Content of a state znode.
+   * @return State parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+      throws DeserializationException {
+    ProtobufUtil.expectPBMagicPrefix(bytes);
+    int pblen = ProtobufUtil.lengthOfPBMagic();
+    ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
+        .newBuilder();
+    ZooKeeperProtos.ReplicationState state;
+    try {
+      state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      return state.getState();
+    } catch (InvalidProtocolBufferException e) {
+      throw new DeserializationException(e);
+    }
+  }
+
+  /**
+   * @param bytes - Content of a HLog position znode.
+   * @return long - The current HLog position.
+   * @throws DeserializationException
+   */
+  static long parseHLogPositionFrom(
+      final byte[] bytes) throws DeserializationException {
+    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      ZooKeeperProtos.ReplicationHLogPosition.Builder builder = ZooKeeperProtos.ReplicationHLogPosition
+          .newBuilder();
+      ZooKeeperProtos.ReplicationHLogPosition position;
+      try {
+        position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      } catch (InvalidProtocolBufferException e) {
+        throw new DeserializationException(e);
+      }
+      return position.getPosition();
+    } else {
+      if (bytes.length > 0) {
+        return Bytes.toLong(bytes);
+      }
+      return 0;
+    }
+  }
+
+  /**
+   * @param bytes - Content of a lock znode.
+   * @return String - The owner of the lock.
+   * @throws DeserializationException
+   */
+  static String parseLockOwnerFrom(
+      final byte[] bytes) throws DeserializationException {
+    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
+          .newBuilder();
+      ZooKeeperProtos.ReplicationLock lock;
+      try {
+        lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      } catch (InvalidProtocolBufferException e) {
+        throw new DeserializationException(e);
+      }
+      return lock.getLockOwner();
+    } else {
+      if (bytes.length > 0) {
+        return Bytes.toString(bytes);
+      }
+      return "";
+    }
+  }
+
+  private boolean peerExists(String id) throws KeeperException {
+    return ZKUtil.checkExists(this.zookeeper,
+          ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
+  }
+
+  /**
+   * Enable replication to the peer
+   *
+   * @param id peer's identifier
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public void enablePeer(String id) throws IOException {
+    changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
+    LOG.info("peer " + id + " is enabled");
+  }
+
+  /**
+   * Disable replication to the peer
+   *
+   * @param id peer's identifier
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public void disablePeer(String id) throws IOException {
+    changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
+    LOG.info("peer " + id + " is disabled");
+  }
+
+  private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
+      throws IOException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("peer " + id + " is not registered");
+      }
+      String peerStateZNode = getPeerStateNode(id);
+      byte[] stateBytes = (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
+          : DISABLED_ZNODE_BYTES;
+      if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
+        ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
+      } else {
+        ZKUtil.createAndWatch(zookeeper, peerStateZNode, stateBytes);
+      }
+      LOG.info("state of the peer " + id + " changed to " + state.name());
+    } catch (KeeperException e) {
+      throw new IOException("Unable to change state of the peer " + id, e);
+    }
+  }
+
+  /**
+   * Check whether the peer is enabled or not. This method checks the atomic
+   * boolean of ReplicationPeer locally.
+   *
+   * @param id peer identifier
+   * @return true if the peer is enabled, otherwise false
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public boolean getPeerEnabled(String id) {
+    if (!this.peerClusters.containsKey(id)) {
+      throw new IllegalArgumentException("peer " + id + " is not registered");
+    }
+    return this.peerClusters.get(id).getPeerEnabled().get();
+  }
+
+  private String getPeerStateNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
+  }
+
+  private String getRepStateNode() {
+    return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
+  }
+
+  /**
+   * Get the replication status of this cluster. If the state znode doesn't exist it will also
+   * create it and set it true.
+   * @return returns true when it's enabled, else false
+   * @throws KeeperException
+   */
+  public boolean getReplication() throws KeeperException {
+    return this.replicationState.getState();
+  }
+
+  /**
+   * Set the new replication state for this cluster
+   * @param newState
+   * @throws KeeperException
+   */
+  public void setReplication(boolean newState) throws KeeperException {
+    this.replicationState.setState(newState);
+  }
+
+  /**
+   * Add a new log to the list of hlogs in zookeeper
+   * @param filename name of the hlog's znode
+   * @param peerId name of the cluster's znode
+   */
+  public void addLogToList(String filename, String peerId)
+    throws KeeperException {
+    String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId);
+    znode = ZKUtil.joinZNode(znode, filename);
+    ZKUtil.createWithParents(this.zookeeper, znode);
+  }
+
+  /**
+   * Remove a log from the list of hlogs in zookeeper
+   * @param filename name of the hlog's znode
+   * @param clusterId name of the cluster's znode
+   */
+  public void removeLogFromList(String filename, String clusterId) {
+    try {
+      String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      ZKUtil.deleteNode(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed remove from list", e);
+    }
+  }
+
+  /**
+   * Set the current position of the specified cluster in the current hlog
+   * @param filename filename name of the hlog's znode
+   * @param clusterId clusterId name of the cluster's znode
+   * @param position the position in the file
+   * @throws IOException
+   */
+  public void writeReplicationStatus(String filename, String clusterId,
+      long position) {
+    try {
+      String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
+      znode = ZKUtil.joinZNode(znode, filename);
+      // Why serialize String of Long and note Long as bytes?
+      ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
+    } catch (KeeperException e) {
+      this.abortable.abort("Writing replication status", e);
+    }
+  }
+
+  /**
+   * Get a list of all the other region servers in this cluster
+   * and set a watch
+   * @return a list of server nanes
+   */
+  public List<String> getRegisteredRegionServers() {
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenAndWatchThem(
+          this.zookeeper, this.zookeeper.rsZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of registered region servers", e);
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of the replicators that have queues, they can be alive, dead
+   * or simply from a previous run
+   * @return a list of server names
+   */
+  public List<String> getListOfReplicators() {
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of replicators", e);
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of peer clusters for the specified server names
+   * @param rs server names of the rs
+   * @return a list of peer cluster
+   */
+  public List<String> getListPeersForRS(String rs) {
+    String znode = ZKUtil.joinZNode(rsZNode, rs);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of peers for rs", e);
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of hlogs for the specified region server and peer cluster
+   * @param rs server names of the rs
+   * @param id peer cluster
+   * @return a list of hlogs
+   */
+  public List<String> getListHLogsForPeerForRS(String rs, String id) {
+    String znode = ZKUtil.joinZNode(rsZNode, rs);
+    znode = ZKUtil.joinZNode(znode, id);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Get list of hlogs for peer", e);
+    }
+    return result;
+  }
+
+  /**
+   * Try to set a lock in another server's znode.
+   * @param znode the server names of the other server
+   * @return true if the lock was acquired, false in every other cases
+   */
+  public boolean lockOtherRS(String znode) {
+    try {
+      String parent = ZKUtil.joinZNode(this.rsZNode, znode);
+      if (parent.equals(rsServerNameZnode)) {
+        LOG.warn("Won't lock because this is us, we're dead!");
+        return false;
+      }
+      String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
+      ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(rsServerNameZnode));
+    } catch (KeeperException e) {
+      // This exception will pop up if the znode under which we're trying to
+      // create the lock is already deleted by another region server, meaning
+      // that the transfer already occurred.
+      // NoNode => transfer is done and znodes are already deleted
+      // NodeExists => lock znode already created by another RS
+      if (e instanceof KeeperException.NoNodeException ||
+          e instanceof KeeperException.NodeExistsException) {
+        LOG.info("Won't transfer the queue," +
+            " another RS took care of it because of: " + e.getMessage());
+      } else {
+        LOG.info("Failed lock other rs", e);
+      }
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * It "atomically" copies all the hlogs queues from another region server and returns them all
+   * sorted per peer cluster (appended with the dead server's znode).
+   * @param znode
+   * @return HLog queues sorted per peer cluster
+   */
+  public SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
+    SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
+    String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs
+    List<String> peerIdsToProcess = null;
+    List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+    try {
+      peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
+      if (peerIdsToProcess == null) return null; // node already processed
+      for (String peerId : peerIdsToProcess) {
+        String newPeerId = peerId + "-" + znode;
+        String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId);
+        // check the logs queue for the old peer cluster
+        String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
+        List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
+        if (hlogs == null || hlogs.size() == 0) continue; // empty log queue.
+        // create the new cluster znode
+        SortedSet<String> logQueue = new TreeSet<String>();
+        queues.put(newPeerId, logQueue);
+        ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
+        listOfOps.add(op);
+        // get the offset of the logs and set it to new znodes
+        for (String hlog : hlogs) {
+          String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
+          byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
+          LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
+          String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
+          listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
+          // add ops for deleting
+          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
+          logQueue.add(hlog);
+        }
+        // add delete op for peer
+        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+      }
+      // add delete op for dead rs
+      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
+      LOG.debug(" The multi list size is: " + listOfOps.size());
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
+      LOG.info("Atomically moved the dead regionserver logs. ");
+    } catch (KeeperException e) {
+      // Multi call failed; it looks like some other regionserver took away the logs.
+      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
+    }
+    return queues;
+  }
+
+  /**
+   * This methods copies all the hlogs queues from another region server
+   * and returns them all sorted per peer cluster (appended with the dead
+   * server's znode)
+   * @param znode server names to copy
+   * @return all hlogs for all peers of that cluster, null if an error occurred
+   */
+  public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+    // TODO this method isn't atomic enough, we could start copying and then
+    // TODO fail for some reason and we would end up with znodes we don't want.
+    SortedMap<String,SortedSet<String>> queues =
+        new TreeMap<String,SortedSet<String>>();
+    try {
+      String nodePath = ZKUtil.joinZNode(rsZNode, znode);
+      List<String> clusters =
+        ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
+      // We have a lock znode in there, it will count as one.
+      if (clusters == null || clusters.size() <= 1) {
+        return queues;
+      }
+      // The lock isn't a peer cluster, remove it
+      clusters.remove(RS_LOCK_ZNODE);
+      for (String cluster : clusters) {
+        // We add the name of the recovered RS to the new znode, we can even
+        // do that for queues that were recovered 10 times giving a znode like
+        // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+        String newCluster = cluster+"-"+znode;
+        String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster);
+        String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
+        List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
+        // That region server didn't have anything to replicate for this cluster
+        if (hlogs == null || hlogs.size() == 0) {
+          continue;
+        }
+        ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
+            HConstants.EMPTY_BYTE_ARRAY);
+        SortedSet<String> logQueue = new TreeSet<String>();
+        queues.put(newCluster, logQueue);
+        for (String hlog : hlogs) {
+          String z = ZKUtil.joinZNode(clusterPath, hlog);
+          byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
+          long position = 0;
+          try {
+            position = parseHLogPositionFrom(positionBytes);
+          } catch (DeserializationException e) {
+            LOG.warn("Failed parse of hlog position from the following znode: " + z);
+          }
+          LOG.debug("Creating " + hlog + " with data " + position);
+          String child = ZKUtil.joinZNode(newClusterZnode, hlog);
+          // Position doesn't actually change, we are just deserializing it for
+          // logging, so just use the already serialized version
+          ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
+          logQueue.add(hlog);
+        }
+      }
+    } catch (KeeperException e) {
+      this.abortable.abort("Copy queues from rs", e);
+    }
+    return queues;
+  }
+
+  /**
+   * Delete a complete queue of hlogs
+   * @param peerZnode znode of the peer cluster queue of hlogs to delete
+   */
+  public void deleteSource(String peerZnode, boolean closeConnection) {
+    try {
+      ZKUtil.deleteNodeRecursively(this.zookeeper,
+          ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
+      if (closeConnection) {
+        this.peerClusters.get(peerZnode).getZkw().close();
+        this.peerClusters.remove(peerZnode);
+      }
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed delete of " + peerZnode, e);
+    }
+  }
+
+  /**
+   * Recursive deletion of all znodes in specified rs' znode
+   * @param znode
+   */
+  public void deleteRsQueues(String znode) {
+    String fullpath = ZKUtil.joinZNode(rsZNode, znode);
+    try {
+      List<String> clusters =
+        ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
+      for (String cluster : clusters) {
+        // We'll delete it later
+        if (cluster.equals(RS_LOCK_ZNODE)) {
+          continue;
+        }
+        String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
+        ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
+      }
+      // Finish cleaning up
+      ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
+    } catch (KeeperException e) {
+      if (e instanceof KeeperException.NoNodeException ||
+          e instanceof KeeperException.NotEmptyException) {
+        // Testing a special case where another region server was able to
+        // create a lock just after we deleted it, but then was also able to
+        // delete the RS znode before us or its lock znode is still there.
+        if (e.getPath().equals(fullpath)) {
+          return;
+        }
+      }
+      this.abortable.abort("Failed delete of " + znode, e);
+    }
+  }
+
+  /**
+   * Delete this cluster's queues
+   */
+  public void deleteOwnRSZNode() {
+    try {
+      ZKUtil.deleteNodeRecursively(this.zookeeper,
+          this.rsServerNameZnode);
+    } catch (KeeperException e) {
+      // if the znode is already expired, don't bother going further
+      if (e instanceof KeeperException.SessionExpiredException) {
+        return;
+      }
+      this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e);
+    }
+  }
+
+  /**
+   * Get the position of the specified hlog in the specified peer znode
+   * @param peerId znode of the peer cluster
+   * @param hlog name of the hlog
+   * @return the position in that hlog
+   * @throws KeeperException 
+   */
+  public long getHLogRepPosition(String peerId, String hlog)
+  throws KeeperException {
+    String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
+    String znode = ZKUtil.joinZNode(clusterZnode, hlog);
+    byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
+    try {
+      return parseHLogPositionFrom(bytes);
+    } catch (DeserializationException de) {
+      LOG.warn("Failed parse of HLogPosition for peerId=" + peerId + " and hlog=" + hlog
+          + "znode content, continuing.");
+    }
+    // if we can not parse the position, start at the beginning of the hlog file
+    // again
+    return 0;
+  }
+
+  /**
+   * Returns the UUID of the provided peer id. Should a connection loss or session
+   * expiration happen, the ZK handler will be reopened once and if it still doesn't
+   * work then it will bail and return null.
+   * @param peerId the peer's ID that will be converted into a UUID
+   * @return a UUID or null if there's a ZK connection issue
+   */
+  public UUID getPeerUUID(String peerId) {
+    ReplicationPeer peer = getPeerClusters().get(peerId);
+    UUID peerUUID = null;
+    try {
+      peerUUID = getUUIDForCluster(peer.getZkw());
+    } catch (KeeperException ke) {
+      reconnectPeer(ke, peer);
+    }
+    return peerUUID;
+  }
+
+  /**
+   * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
+   * @param zkw watcher connected to an ensemble
+   * @return the UUID read from zookeeper
+   * @throws KeeperException
+   */
+  public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
+    return UUID.fromString(ZKClusterId.readClusterIdZNode(zkw));
+  }
+
+  private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
+    if (ke instanceof ConnectionLossException
+      || ke instanceof SessionExpiredException) {
+      LOG.warn(
+        "Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
+        ke);
+      try {
+        peer.reloadZkWatcher();
+      } catch(IOException io) {
+        LOG.warn(
+          "Creation of ZookeeperWatcher failed for peer "
+            + peer.getClusterKey(), io);
+      }
+    }
+  }
+
+  public void registerRegionServerListener(ZooKeeperListener listener) {
+    this.zookeeper.registerListener(listener);
+  }
+
+  /**
+   * Get a map of all peer clusters
+   * @return map of peer cluster keyed by id
+   */
+  public Map<String, ReplicationPeer> getPeerClusters() {
+    return this.peerClusters;
+  }
+  
+  /**
+   * Determine if a ZK path points to a peer node.
+   * @param path path to be checked
+   * @return true if the path points to a peer node, otherwise false
+   */
+  public boolean isPeerPath(String path) {
+    return path.split("/").length == peersZNode.split("/").length + 1;
+  }
+
+  /**
+   * Extracts the znode name of a peer cluster from a ZK path
+   * @param fullPath Path to extract the id from
+   * @return the id or an empty string if path is invalid
+   */
+  public static String getZNodeName(String fullPath) {
+    String[] parts = fullPath.split("/");
+    return parts.length > 0 ? parts[parts.length-1] : "";
+  }
+
+  /**
+   * Get this cluster's zk connection
+   * @return zk connection
+   */
+  public ZooKeeperWatcher getZookeeperWatcher() {
+    return this.zookeeper;
+  }
+
+
+  /**
+   * Get the full path to the peers' znode
+   * @return path to peers in zk
+   */
+  public String getPeersZNode() {
+    return peersZNode;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (replicationState != null) replicationState.close();
+  }
+
+  /**
+   * Utility method to ensure an ENABLED znode is in place; if not present, we
+   * create it.
+   * @param zookeeper
+   * @param path Path to znode to check
+   * @return True if we created the znode.
+   * @throws NodeExistsException
+   * @throws KeeperException
+   */
+  static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+      throws NodeExistsException, KeeperException {
+    if (ZKUtil.checkExists(zookeeper, path) == -1) {
+      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+      // peer-state znode. This happens while adding a peer.
+      // The peer state data is set as "ENABLED" by default.
+      ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * @param bytes
+   * @return True if the passed in <code>bytes</code> are those of a pb
+   *         serialized ENABLED state.
+   * @throws DeserializationException
+   */
+  static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+    ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+    return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AuthMethod.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AuthMethod.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AuthMethod.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AuthMethod.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/** Authentication method */
+public enum AuthMethod {
+  SIMPLE((byte) 80, "", UserGroupInformation.AuthenticationMethod.SIMPLE),
+  KERBEROS((byte) 81, "GSSAPI", UserGroupInformation.AuthenticationMethod.KERBEROS),
+  DIGEST((byte) 82, "DIGEST-MD5", UserGroupInformation.AuthenticationMethod.TOKEN);
+
+  /** The code for this method. */
+  public final byte code;
+  public final String mechanismName;
+  public final UserGroupInformation.AuthenticationMethod authenticationMethod;
+
+  AuthMethod(byte code, String mechanismName,
+             UserGroupInformation.AuthenticationMethod authMethod) {
+    this.code = code;
+    this.mechanismName = mechanismName;
+    this.authenticationMethod = authMethod;
+  }
+
+  private static final int FIRST_CODE = values()[0].code;
+
+  /** Return the object represented by the code. */
+  private static AuthMethod valueOf(byte code) {
+    final int i = (code & 0xff) - FIRST_CODE;
+    return i < 0 || i >= values().length ? null : values()[i];
+  }
+
+  /** Return the SASL mechanism name */
+  public String getMechanismName() {
+    return mechanismName;
+  }
+
+  /** Read from in */
+  public static AuthMethod read(DataInput in) throws IOException {
+    return valueOf(in.readByte());
+  }
+
+  /** Write to out */
+  public void write(DataOutput out) throws IOException {
+    out.write(code);
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslOutputStream;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * A utility class that encapsulates SASL logic for RPC client.
+ * Copied from <code>org.apache.hadoop.security</code>
+ */
+public class HBaseSaslRpcClient {
+  public static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class);
+
+  private final SaslClient saslClient;
+
+  /**
+   * Create a HBaseSaslRpcClient for an authentication method
+   * 
+   * @param method
+   *          the requested authentication method
+   * @param token
+   *          token to use if needed by the authentication method
+   */
+  public HBaseSaslRpcClient(AuthMethod method,
+      Token<? extends TokenIdentifier> token, String serverPrincipal)
+      throws IOException {
+    switch (method) {
+    case DIGEST:
+      if (LOG.isDebugEnabled())
+        LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
+            + " client to authenticate to service at " + token.getService());
+      saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST
+          .getMechanismName() }, null, null, SaslUtil.SASL_DEFAULT_REALM,
+          SaslUtil.SASL_PROPS, new SaslClientCallbackHandler(token));
+      break;
+    case KERBEROS:
+      if (LOG.isDebugEnabled()) {
+        LOG
+            .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
+                + " client. Server's Kerberos principal name is "
+                + serverPrincipal);
+      }
+      if (serverPrincipal == null || serverPrincipal.length() == 0) {
+        throw new IOException(
+            "Failed to specify server's Kerberos principal name");
+      }
+      String names[] = SaslUtil.splitKerberosName(serverPrincipal);
+      if (names.length != 3) {
+        throw new IOException(
+          "Kerberos principal does not have the expected format: "
+                + serverPrincipal);
+      }
+      saslClient = Sasl.createSaslClient(new String[] { AuthMethod.KERBEROS
+          .getMechanismName() }, null, names[0], names[1],
+          SaslUtil.SASL_PROPS, null);
+      break;
+    default:
+      throw new IOException("Unknown authentication method " + method);
+    }
+    if (saslClient == null)
+      throw new IOException("Unable to find SASL client implementation");
+  }
+
+  private static void readStatus(DataInputStream inStream) throws IOException {
+    int status = inStream.readInt(); // read status
+    if (status != SaslStatus.SUCCESS.state) {
+      throw new RemoteException(WritableUtils.readString(inStream),
+          WritableUtils.readString(inStream));
+    }
+  }
+  
+  /**
+   * Do client side SASL authentication with server via the given InputStream
+   * and OutputStream
+   * 
+   * @param inS
+   *          InputStream to use
+   * @param outS
+   *          OutputStream to use
+   * @return true if connection is set up, or false if needs to switch 
+   *             to simple Auth.
+   * @throws IOException
+   */
+  public boolean saslConnect(InputStream inS, OutputStream outS)
+      throws IOException {
+    DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
+    DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
+        outS));
+
+    try {
+      byte[] saslToken = new byte[0];
+      if (saslClient.hasInitialResponse())
+        saslToken = saslClient.evaluateChallenge(saslToken);
+      if (saslToken != null) {
+        outStream.writeInt(saslToken.length);
+        outStream.write(saslToken, 0, saslToken.length);
+        outStream.flush();
+        if (LOG.isDebugEnabled())
+          LOG.debug("Have sent token of size " + saslToken.length
+              + " from initSASLContext.");
+      }
+      if (!saslClient.isComplete()) {
+        readStatus(inStream);
+        int len = inStream.readInt();
+        if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("Server asks us to fall back to simple auth.");
+          saslClient.dispose();
+          return false;
+        }
+        saslToken = new byte[len];
+        if (LOG.isDebugEnabled())
+          LOG.debug("Will read input token of size " + saslToken.length
+              + " for processing by initSASLContext");
+        inStream.readFully(saslToken);
+      }
+
+      while (!saslClient.isComplete()) {
+        saslToken = saslClient.evaluateChallenge(saslToken);
+        if (saslToken != null) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("Will send token of size " + saslToken.length
+                + " from initSASLContext.");
+          outStream.writeInt(saslToken.length);
+          outStream.write(saslToken, 0, saslToken.length);
+          outStream.flush();
+        }
+        if (!saslClient.isComplete()) {
+          readStatus(inStream);
+          saslToken = new byte[inStream.readInt()];
+          if (LOG.isDebugEnabled())
+            LOG.debug("Will read input token of size " + saslToken.length
+                + " for processing by initSASLContext");
+          inStream.readFully(saslToken);
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client context established. Negotiated QoP: "
+            + saslClient.getNegotiatedProperty(Sasl.QOP));
+      }
+      return true;
+    } catch (IOException e) {
+      try {
+        saslClient.dispose();
+      } catch (SaslException ignored) {
+        // ignore further exceptions during cleanup
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
+   * been called.
+   * 
+   * @param in
+   *          the InputStream to wrap
+   * @return a SASL wrapped InputStream
+   * @throws IOException
+   */
+  public InputStream getInputStream(InputStream in) throws IOException {
+    if (!saslClient.isComplete()) {
+      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    }
+    return new SaslInputStream(in, saslClient);
+  }
+
+  /**
+   * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
+   * been called.
+   * 
+   * @param out
+   *          the OutputStream to wrap
+   * @return a SASL wrapped OutputStream
+   * @throws IOException
+   */
+  public OutputStream getOutputStream(OutputStream out) throws IOException {
+    if (!saslClient.isComplete()) {
+      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    }
+    return new SaslOutputStream(out, saslClient);
+  }
+
+  /** Release resources used by wrapped saslClient */
+  public void dispose() throws SaslException {
+    saslClient.dispose();
+  }
+
+  private static class SaslClientCallbackHandler implements CallbackHandler {
+    private final String userName;
+    private final char[] userPassword;
+
+    public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+      this.userName = SaslUtil.encodeIdentifier(token.getIdentifier());
+      this.userPassword = SaslUtil.encodePassword(token.getPassword());
+    }
+
+    public void handle(Callback[] callbacks)
+        throws UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("SASL client callback: setting username: " + userName);
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("SASL client callback: setting userPassword");
+        pc.setPassword(userPassword);
+      }
+      if (rc != null) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("SASL client callback: setting realm: "
+              + rc.getDefaultText());
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/KerberosInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/KerberosInfo.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/KerberosInfo.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/KerberosInfo.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Indicates Kerberos related information to be used for authorizing connections
+ * over a given RPC protocol interface.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+@InterfaceAudience.Private
+public @interface KerberosInfo {
+  /** Key for getting server's Kerberos principal name from Configuration */
+  String serverPrincipal();
+  String clientPrincipal() default "";
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslStatus.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslStatus.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslStatus.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslStatus.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,30 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+public enum SaslStatus {
+  SUCCESS (0),
+  ERROR (1);
+
+  public final int state;
+  SaslStatus(int state) {
+    this.state = state;
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.commons.codec.binary.Base64;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class SaslUtil {
+  public static final String SASL_DEFAULT_REALM = "default";
+  public static final Map<String, String> SASL_PROPS =
+      new TreeMap<String, String>();
+  public static final int SWITCH_TO_SIMPLE_AUTH = -88;
+
+  /** Splitting fully qualified Kerberos name into parts */
+  public static String[] splitKerberosName(String fullName) {
+    return fullName.split("[/@]");
+  }
+
+  static String encodeIdentifier(byte[] identifier) {
+    return new String(Base64.encodeBase64(identifier));
+  }
+
+  static byte[] decodeIdentifier(String identifier) {
+    return Base64.decodeBase64(identifier.getBytes());
+  }
+
+  static char[] encodePassword(byte[] password) {
+    return new String(Base64.encodeBase64(password)).toCharArray();
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecureBulkLoadUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecureBulkLoadUtil.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecureBulkLoadUtil.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecureBulkLoadUtil.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class SecureBulkLoadUtil {
+  private final static String BULKLOAD_STAGING_DIR = "hbase.bulkload.staging.dir";
+
+  /**
+   * This returns the staging path for a given column family.
+   * This is needed for clean recovery and called reflectively in LoadIncrementalHFiles
+   */
+  public static Path getStagingPath(Configuration conf, String bulkToken, byte[] family) {
+    Path stageP = new Path(getBaseStagingDir(conf), bulkToken);
+    return new Path(stageP, Bytes.toString(family));
+  }
+
+  public static Path getBaseStagingDir(Configuration conf) {
+    return new Path(conf.get(BULKLOAD_STAGING_DIR, "/tmp/hbase-staging"));
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/TokenInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/TokenInfo.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/TokenInfo.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/TokenInfo.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Indicates Token related information to be used in authorizing connections
+ * over a given RPC protocol interface.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+@InterfaceAudience.Private
+public @interface TokenInfo {
+  /** The type of Token.getKind() to be handled */
+  String value();
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/User.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/User.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/User.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/User.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,406 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.util.Methods;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Wrapper to abstract out usage of user and group information in HBase.
+ *
+ * <p>
+ * This class provides a common interface for interacting with user and group
+ * information across changing APIs in different versions of Hadoop.  It only
+ * provides access to the common set of functionality in
+ * {@link org.apache.hadoop.security.UserGroupInformation} currently needed by
+ * HBase, but can be extended as needs change.
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class User {
+  public static final String HBASE_SECURITY_CONF_KEY =
+      "hbase.security.authentication";
+
+  private static Log LOG = LogFactory.getLog(User.class);
+
+  protected UserGroupInformation ugi;
+
+  public UserGroupInformation getUGI() {
+    return ugi;
+  }
+
+  /**
+   * Returns the full user name.  For Kerberos principals this will include
+   * the host and realm portions of the principal name.
+   * @return User full name.
+   */
+  public String getName() {
+    return ugi.getUserName();
+  }
+
+  /**
+   * Returns the list of groups of which this user is a member.  On secure
+   * Hadoop this returns the group information for the user as resolved on the
+   * server.  For 0.20 based Hadoop, the group names are passed from the client.
+   */
+  public String[] getGroupNames() {
+    return ugi.getGroupNames();
+  }
+
+  /**
+   * Returns the shortened version of the user name -- the portion that maps
+   * to an operating system user name.
+   * @return Short name
+   */
+  public abstract String getShortName();
+
+  /**
+   * Executes the given action within the context of this user.
+   */
+  public abstract <T> T runAs(PrivilegedAction<T> action);
+
+  /**
+   * Executes the given action within the context of this user.
+   */
+  public abstract <T> T runAs(PrivilegedExceptionAction<T> action)
+      throws IOException, InterruptedException;
+
+  /**
+   * Requests an authentication token for this user and stores it in the
+   * user's credentials.
+   *
+   * @throws IOException
+   */
+  public abstract void obtainAuthTokenForJob(Configuration conf, Job job)
+      throws IOException, InterruptedException;
+
+  /**
+   * Requests an authentication token for this user and stores it in the
+   * user's credentials.
+   *
+   * @throws IOException
+   */
+  public abstract void obtainAuthTokenForJob(JobConf job)
+      throws IOException, InterruptedException;
+
+  public String toString() {
+    return ugi.toString();
+  }
+
+  /**
+   * Returns the {@code User} instance within current execution context.
+   */
+  public static User getCurrent() throws IOException {
+    User user = new SecureHadoopUser();
+    if (user.getUGI() == null) {
+      return null;
+    }
+    return user;
+  }
+
+  /**
+   * Wraps an underlying {@code UserGroupInformation} instance.
+   * @param ugi The base Hadoop user
+   * @return User
+   */
+  public static User create(UserGroupInformation ugi) {
+    if (ugi == null) {
+      return null;
+    }
+    return new SecureHadoopUser(ugi);
+  }
+
+  /**
+   * Generates a new {@code User} instance specifically for use in test code.
+   * @param name the full username
+   * @param groups the group names to which the test user will belong
+   * @return a new <code>User</code> instance
+   */
+  public static User createUserForTesting(Configuration conf,
+      String name, String[] groups) {
+    return SecureHadoopUser.createUserForTesting(conf, name, groups);
+  }
+
+  /**
+   * Log in the current process using the given configuration keys for the
+   * credential file and login principal.
+   *
+   * <p><strong>This is only applicable when
+   * running on secure Hadoop</strong> -- see
+   * org.apache.hadoop.security.SecurityUtil#login(Configuration,String,String,String).
+   * On regular Hadoop (without security features), this will safely be ignored.
+   * </p>
+   *
+   * @param conf The configuration data to use
+   * @param fileConfKey Property key used to configure path to the credential file
+   * @param principalConfKey Property key used to configure login principal
+   * @param localhost Current hostname to use in any credentials
+   * @throws IOException underlying exception from SecurityUtil.login() call
+   */
+  public static void login(Configuration conf, String fileConfKey,
+      String principalConfKey, String localhost) throws IOException {
+    SecureHadoopUser.login(conf, fileConfKey, principalConfKey, localhost);
+  }
+
+  /**
+   * Returns whether or not Kerberos authentication is configured for Hadoop.
+   * For non-secure Hadoop, this always returns <code>false</code>.
+   * For secure Hadoop, it will return the value from
+   * {@code UserGroupInformation.isSecurityEnabled()}.
+   */
+  public static boolean isSecurityEnabled() {
+    return SecureHadoopUser.isSecurityEnabled();
+  }
+
+  /**
+   * Returns whether or not secure authentication is enabled for HBase.  Note that
+   * HBase security requires HDFS security to provide any guarantees, so this requires that
+   * both <code>hbase.security.authentication</code> and <code>hadoop.security.authentication</code>
+   * are set to <code>kerberos</code>.
+   */
+  public static boolean isHBaseSecurityEnabled(Configuration conf) {
+    return "kerberos".equalsIgnoreCase(conf.get(HBASE_SECURITY_CONF_KEY)) &&
+        "kerberos".equalsIgnoreCase(
+            conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
+  }
+
+  /* Concrete implementations */
+
+  /**
+   * Bridges {@code User} invocations to underlying calls to
+   * {@link org.apache.hadoop.security.UserGroupInformation} for secure Hadoop
+   * 0.20 and versions 0.21 and above.
+   */
+  private static class SecureHadoopUser extends User {
+    private String shortName;
+
+    private SecureHadoopUser() throws IOException {
+      try {
+        ugi = (UserGroupInformation) callStatic("getCurrentUser");
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (RuntimeException re) {
+        throw re;
+      } catch (Exception e) {
+        throw new UndeclaredThrowableException(e,
+            "Unexpected exception getting current secure user");
+      }
+    }
+
+    private SecureHadoopUser(UserGroupInformation ugi) {
+      this.ugi = ugi;
+    }
+
+    @Override
+    public String getShortName() {
+      if (shortName != null) return shortName;
+
+      try {
+        shortName = (String)call(ugi, "getShortUserName", null, null);
+        return shortName;
+      } catch (RuntimeException re) {
+        throw re;
+      } catch (Exception e) {
+        throw new UndeclaredThrowableException(e,
+            "Unexpected error getting user short name");
+      }
+    }
+
+    @Override
+    public <T> T runAs(PrivilegedAction<T> action) {
+      try {
+        return (T) call(ugi, "doAs", new Class[]{PrivilegedAction.class},
+            new Object[]{action});
+      } catch (RuntimeException re) {
+        throw re;
+      } catch (Exception e) {
+        throw new UndeclaredThrowableException(e,
+            "Unexpected exception in runAs()");
+      }
+    }
+
+    @Override
+    public <T> T runAs(PrivilegedExceptionAction<T> action)
+        throws IOException, InterruptedException {
+      try {
+        return (T) call(ugi, "doAs",
+            new Class[]{PrivilegedExceptionAction.class},
+            new Object[]{action});
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (InterruptedException ie) {
+        throw ie;
+      } catch (RuntimeException re) {
+        throw re;
+      } catch (Exception e) {
+        throw new UndeclaredThrowableException(e,
+            "Unexpected exception in runAs(PrivilegedExceptionAction)");
+      }
+    }
+
+    @Override
+    public void obtainAuthTokenForJob(Configuration conf, Job job)
+        throws IOException, InterruptedException {
+      try {
+        Class c = Class.forName(
+            "org.apache.hadoop.hbase.security.token.TokenUtil");
+        Methods.call(c, null, "obtainTokenForJob",
+            new Class[]{Configuration.class, UserGroupInformation.class,
+                Job.class},
+            new Object[]{conf, ugi, job});
+      } catch (ClassNotFoundException cnfe) {
+        throw new RuntimeException("Failure loading TokenUtil class, "
+            +"is secure RPC available?", cnfe);
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (InterruptedException ie) {
+        throw ie;
+      } catch (RuntimeException re) {
+        throw re;
+      } catch (Exception e) {
+        throw new UndeclaredThrowableException(e,
+            "Unexpected error calling TokenUtil.obtainAndCacheToken()");
+      }
+    }
+
+    @Override
+    public void obtainAuthTokenForJob(JobConf job)
+        throws IOException, InterruptedException {
+      try {
+        Class c = Class.forName(
+            "org.apache.hadoop.hbase.security.token.TokenUtil");
+        Methods.call(c, null, "obtainTokenForJob",
+            new Class[]{JobConf.class, UserGroupInformation.class},
+            new Object[]{job, ugi});
+      } catch (ClassNotFoundException cnfe) {
+        throw new RuntimeException("Failure loading TokenUtil class, "
+            +"is secure RPC available?", cnfe);
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (InterruptedException ie) {
+        throw ie;
+      } catch (RuntimeException re) {
+        throw re;
+      } catch (Exception e) {
+        throw new UndeclaredThrowableException(e,
+            "Unexpected error calling TokenUtil.obtainAndCacheToken()");
+      }
+    }
+
+    /** @see User#createUserForTesting(org.apache.hadoop.conf.Configuration, String, String[]) */
+    public static User createUserForTesting(Configuration conf,
+        String name, String[] groups) {
+      try {
+        return new SecureHadoopUser(
+            (UserGroupInformation)callStatic("createUserForTesting",
+                new Class[]{String.class, String[].class},
+                new Object[]{name, groups})
+        );
+      } catch (RuntimeException re) {
+        throw re;
+      } catch (Exception e) {
+        throw new UndeclaredThrowableException(e,
+            "Error creating secure test user");
+      }
+    }
+
+    /**
+     * Obtain credentials for the current process using the configured
+     * Kerberos keytab file and principal.
+     * @see User#login(org.apache.hadoop.conf.Configuration, String, String, String)
+     *
+     * @param conf the Configuration to use
+     * @param fileConfKey Configuration property key used to store the path
+     * to the keytab file
+     * @param principalConfKey Configuration property key used to store the
+     * principal name to login as
+     * @param localhost the local hostname
+     */
+    public static void login(Configuration conf, String fileConfKey,
+        String principalConfKey, String localhost) throws IOException {
+      if (isSecurityEnabled()) {
+        // check for SecurityUtil class
+        try {
+          Class c = Class.forName("org.apache.hadoop.security.SecurityUtil");
+          Class[] types = new Class[]{
+              Configuration.class, String.class, String.class, String.class };
+          Object[] args = new Object[]{
+              conf, fileConfKey, principalConfKey, localhost };
+          Methods.call(c, null, "login", types, args);
+        } catch (ClassNotFoundException cnfe) {
+          throw new RuntimeException("Unable to login using " +
+              "org.apache.hadoop.security.SecurityUtil.login(). SecurityUtil class " +
+              "was not found!  Is this a version of secure Hadoop?", cnfe);
+        } catch (IOException ioe) {
+          throw ioe;
+        } catch (RuntimeException re) {
+          throw re;
+        } catch (Exception e) {
+          throw new UndeclaredThrowableException(e,
+              "Unhandled exception in User.login()");
+        }
+      }
+    }
+
+    /**
+     * Returns the result of {@code UserGroupInformation.isSecurityEnabled()}.
+     */
+    public static boolean isSecurityEnabled() {
+      try {
+        return (Boolean)callStatic("isSecurityEnabled");
+      } catch (RuntimeException re) {
+        throw re;
+      } catch (Exception e) {
+        throw new UndeclaredThrowableException(e,
+            "Unexpected exception calling UserGroupInformation.isSecurityEnabled()");
+      }
+    }
+  }
+
+  /* Reflection helper methods */
+  private static Object callStatic(String methodName) throws Exception {
+    return call(null, methodName, null, null);
+  }
+
+  private static Object callStatic(String methodName, Class[] types,
+      Object[] args) throws Exception {
+    return call(null, methodName, types, args);
+  }
+
+  private static Object call(UserGroupInformation instance, String methodName,
+      Class[] types, Object[] args) throws Exception {
+    return Methods.call(UserGroupInformation.class, instance, methodName, types,
+        args);
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.VersionedWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Base permissions instance representing the ability to perform a given set
+ * of actions.
+ *
+ * @see TablePermission
+ */
+public class Permission extends VersionedWritable {
+  protected static final byte VERSION = 0;
+  public enum Action {
+    READ('R'), WRITE('W'), EXEC('X'), CREATE('C'), ADMIN('A');
+
+    private byte code;
+    Action(char code) {
+      this.code = (byte)code;
+    }
+
+    public byte code() { return code; }
+  }
+
+  private static final Log LOG = LogFactory.getLog(Permission.class);
+  protected static final Map<Byte,Action> ACTION_BY_CODE = Maps.newHashMap();
+
+  protected Action[] actions;
+
+  static {
+    for (Action a : Action.values()) {
+      ACTION_BY_CODE.put(a.code(), a);
+    }
+  }
+
+  /** Empty constructor for Writable implementation.  <b>Do not use.</b> */
+  public Permission() {
+    super();
+  }
+
+  public Permission(Action... assigned) {
+    if (assigned != null && assigned.length > 0) {
+      actions = Arrays.copyOf(assigned, assigned.length);
+    }
+  }
+
+  public Permission(byte[] actionCodes) {
+    if (actionCodes != null) {
+      Action acts[] = new Action[actionCodes.length];
+      int j = 0;
+      for (int i=0; i<actionCodes.length; i++) {
+        byte b = actionCodes[i];
+        Action a = ACTION_BY_CODE.get(b);
+        if (a == null) {
+          LOG.error("Ignoring unknown action code '"+
+              Bytes.toStringBinary(new byte[]{b})+"'");
+          continue;
+        }
+        acts[j++] = a;
+      }
+      this.actions = Arrays.copyOf(acts, j);
+    }
+  }
+
+  public Action[] getActions() {
+    return actions;
+  }
+
+  public boolean implies(Action action) {
+    if (this.actions != null) {
+      for (Action a : this.actions) {
+        if (a == action) {
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof Permission)) {
+      return false;
+    }
+    Permission other = (Permission)obj;
+    // check actions
+    if (actions == null && other.getActions() == null) {
+      return true;
+    } else if (actions != null && other.getActions() != null) {
+      Action[] otherActions = other.getActions();
+      if (actions.length != otherActions.length) {
+        return false;
+      }
+
+      outer:
+      for (Action a : actions) {
+        for (Action oa : otherActions) {
+          if (a == oa) continue outer;
+        }
+        return false;
+      }
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 37;
+    int result = 23;
+    for (Action a : actions) {
+      result = prime * result + a.code();
+    }
+    return result;
+  }
+
+  public String toString() {
+    StringBuilder str = new StringBuilder("[Permission: ")
+        .append("actions=");
+    if (actions != null) {
+      for (int i=0; i<actions.length; i++) {
+        if (i > 0)
+          str.append(",");
+        if (actions[i] != null)
+          str.append(actions[i].toString());
+        else
+          str.append("NULL");
+      }
+    }
+    str.append("]");
+
+    return str.toString();
+  }
+
+  /** @return the object version number */
+  public byte getVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int length = (int)in.readByte();
+    if (length > 0) {
+      actions = new Action[length];
+      for (int i = 0; i < length; i++) {
+        byte b = in.readByte();
+        Action a = ACTION_BY_CODE.get(b);
+        if (a == null) {
+          throw new IOException("Unknown action code '"+
+              Bytes.toStringBinary(new byte[]{b})+"' in input");
+        }
+        this.actions[i] = a;
+      }
+    } else {
+      actions = new Action[0];
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeByte(actions != null ? actions.length : 0);
+    if (actions != null) {
+      for (Action a: actions) {
+        out.writeByte(a.code());
+      }
+    }
+  }
+}



Mime
View raw message