hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoopsamj...@apache.org
Subject hbase git commit: HBASE-12820 Use table lock instead of MobZookeeper.(Jingcheng Du)
Date Sat, 24 Jan 2015 06:07:25 GMT
Repository: hbase
Updated Branches:
  refs/heads/hbase-11339 1b800f7d4 -> fbbb3249d


HBASE-12820 Use table lock instead of MobZookeeper.(Jingcheng Du)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fbbb3249
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fbbb3249
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fbbb3249

Branch: refs/heads/hbase-11339
Commit: fbbb3249d9ef6aa5bf12ca2abbf67f4a89c86c18
Parents: 1b800f7
Author: anoopsjohn <anoopsamjohn@gmail.com>
Authored: Sat Jan 24 11:37:01 2015 +0530
Committer: anoopsjohn <anoopsamjohn@gmail.com>
Committed: Sat Jan 24 11:37:01 2015 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hbase/mob/MobConstants.java   |   1 +
 .../org/apache/hadoop/hbase/mob/MobUtils.java   |  21 +-
 .../apache/hadoop/hbase/mob/MobZookeeper.java   | 270 -------------------
 .../hadoop/hbase/mob/mapreduce/SweepJob.java    | 126 +++++----
 .../mob/mapreduce/SweepJobNodeTracker.java      |  56 ++--
 .../hadoop/hbase/mob/mapreduce/SweepMapper.java |   7 +-
 .../hbase/mob/mapreduce/SweepReducer.java       |  13 +-
 .../hadoop/hbase/regionserver/HMobStore.java    |  87 +++---
 .../hbase/mob/mapreduce/TestMobSweepMapper.java |  78 ++++--
 .../mob/mapreduce/TestMobSweepReducer.java      |  77 +++---
 .../hbase/mob/mapreduce/TestMobSweeper.java     |   4 +-
 .../hbase/regionserver/TestMobCompaction.java   |  56 ----
 12 files changed, 276 insertions(+), 520 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
index 7b0f9a0..f40c952 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -72,6 +72,7 @@ public class MobConstants {
   public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
 
   public final static String TEMP_DIR_NAME = ".tmp";
+  public final static byte[] MOB_TABLE_LOCK_SUFFIX = Bytes.toBytes(".mobLock");
   public final static String EMPTY_STRING = "";
   private MobConstants() {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index d0bb3ec..43521d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -274,16 +274,6 @@ public class MobUtils {
   }
 
   /**
-   * Gets the znode name of column family.
-   * @param tableName The current table name.
-   * @param familyName The name of the current column family.
-   * @return The znode name of column family.
-   */
-  public static String getColumnFamilyZNodeName(String tableName, String familyName) {
-    return tableName + ":" + familyName;
-  }
-
-  /**
    * Gets the root dir of the mob files.
    * It's {HBASE_DIR}/mobdir.
    * @param conf The current configuration.
@@ -548,4 +538,15 @@ public class MobUtils {
     return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
         cell.getValueLength() - Bytes.SIZEOF_INT);
   }
+
+  /**
+   * Gets the table name used in the table lock.
+   * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock".
+   * @param tn The table name.
+   * @return The table name used in table lock.
+   */
+  public static TableName getTableLockName(TableName tn) {
+    byte[] tableName = tn.getName();
+    return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
deleted file mode 100644
index a9557d7..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobZookeeper.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mob;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * The zookeeper used for MOB.
- * This zookeeper is used to synchronize the HBase major compaction and sweep tool.
- * The structure of the nodes for mob in zookeeper.
- * |--baseNode
- *     |--MOB
- *         |--tableName:columnFamilyName-lock // locks for the mob column family
- *         |--tableName:columnFamilyName-sweeper // when a sweep tool runs, such a node is added
- *         |--tableName:columnFamilyName-majorCompaction
- *              |--UUID //when a major compaction occurs, such a node is added.
- * In order to synchronize the operations between the sweep tool and HBase major compaction, these
- * actions need to acquire the tableName:columnFamilyName-lock before the sweep tool and major
- * compaction run.
- * In sweep tool.
- * 1. If it acquires the lock successfully. It check whether the sweeper node exists, if exist the
- * current running is aborted. If not it it checks whether there're major compaction nodes, if yes
- * the current running is aborted, if not it adds a sweep node to the zookeeper.
- * 2. If it could not obtain the lock, the current running is aborted.
- * In the HBase compaction.
- * 1. If it's a minor compaction, continue the compaction.
- * 2. If it's a major compaction, it acquires a lock in zookeeper.
- *    A. If it obtains the lock, it checks whether there's sweep node, if yes it converts itself
- *    to a minor one and continue, if no it adds a major compaction node to the zookeeper.
- *    B. If it could not obtain the lock, it converts itself to a minor one and continue the
- *    compaction.
- */
-@InterfaceAudience.Private
-public class MobZookeeper {
-  // TODO Will remove this class before the mob is merged back to master.
-  private static final Log LOG = LogFactory.getLog(MobZookeeper.class);
-
-  private ZooKeeperWatcher zkw;
-  private String mobZnode;
-  private static final String LOCK_EPHEMERAL = "-lock";
-  private static final String SWEEPER_EPHEMERAL = "-sweeper";
-  private static final String MAJOR_COMPACTION_EPHEMERAL = "-majorCompaction";
-
-  private MobZookeeper(Configuration conf, String identifier) throws IOException,
-      KeeperException {
-    this.zkw = new ZooKeeperWatcher(conf, identifier, new DummyMobAbortable());
-    mobZnode = ZKUtil.joinZNode(zkw.baseZNode, "MOB");
-    if (ZKUtil.checkExists(zkw, mobZnode) == -1) {
-      ZKUtil.createWithParents(zkw, mobZnode);
-    }
-  }
-
-  /**
-   * Creates an new instance of MobZookeeper.
-   * @param conf The current configuration.
-   * @param identifier string that is passed to RecoverableZookeeper to be used as
-   * identifier for this instance.
-   * @return A new instance of MobZookeeper.
-   * @throws IOException
-   * @throws KeeperException
-   */
-  public static MobZookeeper newInstance(Configuration conf, String identifier) throws IOException,
-      KeeperException {
-    return new MobZookeeper(conf, identifier);
-  }
-
-  /**
-   * Acquire a lock on the current column family.
-   * All the threads try to access the column family acquire a lock which is actually create an
-   * ephemeral node in the zookeeper.
-   * @param tableName The current table name.
-   * @param familyName The current column family name.
-   * @return True if the lock is obtained successfully. Otherwise false is returned.
-   */
-  public boolean lockColumnFamily(String tableName, String familyName) {
-    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
-    boolean locked = false;
-    try {
-      locked = ZKUtil.createEphemeralNodeAndWatch(zkw,
-          ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL), null);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(locked ? "Locked the column family " + znodeName
-            : "Can not lock the column family " + znodeName);
-      }
-    } catch (KeeperException e) {
-      LOG.error("Fail to lock the column family " + znodeName, e);
-    }
-    return locked;
-  }
-
-  /**
-   * Release the lock on the current column family.
-   * @param tableName The current table name.
-   * @param familyName The current column family name.
-   */
-  public void unlockColumnFamily(String tableName, String familyName) {
-    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Unlocking the column family " + znodeName);
-    }
-    try {
-      ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL));
-    } catch (KeeperException e) {
-      LOG.warn("Fail to unlock the column family " + znodeName, e);
-    }
-  }
-
-  /**
-   * Adds a node to zookeeper which indicates that a sweep tool is running.
-   * @param tableName The current table name.
-   * @param familyName The current columnFamilyName name.
-   * @param data the data of the ephemeral node.
-   * @return True if the node is created successfully. Otherwise false is returned.
-   */
-  public boolean addSweeperZNode(String tableName, String familyName, byte[] data) {
-    boolean add = false;
-    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
-    try {
-      add = ZKUtil.createEphemeralNodeAndWatch(zkw,
-          ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL), data);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(add ? "Added a znode for sweeper " + znodeName
-            : "Cannot add a znode for sweeper " + znodeName);
-      }
-    } catch (KeeperException e) {
-      LOG.error("Fail to add a znode for sweeper " + znodeName, e);
-    }
-    return add;
-  }
-
-  /**
-   * Gets the path of the sweeper znode in zookeeper.
-   * @param tableName The current table name.
-   * @param familyName The current columnFamilyName name.
-   * @return The path of the sweeper znode in zookeper.
-   */
-  public String getSweeperZNodePath(String tableName, String familyName) {
-    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
-    return ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL);
-  }
-
-  /**
-   * Deletes the node from zookeeper which indicates that a sweep tool is finished.
-   * @param tableName The current table name.
-   * @param familyName The current column family name.
-   */
-  public void deleteSweeperZNode(String tableName, String familyName) {
-    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
-    try {
-      ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL));
-    } catch (KeeperException e) {
-      LOG.error("Fail to delete a znode for sweeper " + znodeName, e);
-    }
-  }
-
-  /**
-   * Checks whether the znode exists in the Zookeeper.
-   * If the node exists, it means a sweep tool is running.
-   * Otherwise, the sweep tool is not.
-   * @param tableName The current table name.
-   * @param familyName The current column family name.
-   * @return True if this node doesn't exist. Otherwise false is returned.
-   * @throws KeeperException
-   */
-  public boolean isSweeperZNodeExist(String tableName, String familyName) throws KeeperException {
-    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
-    return ZKUtil.checkExists(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)) >= 0;
-  }
-
-  /**
-   * Checks whether there're major compactions nodes in the zookeeper.
-   * If there're such nodes, it means there're major compactions in progress now.
-   * Otherwise there're not.
-   * @param tableName The current table name.
-   * @param familyName The current column family name.
-   * @return True if there're major compactions in progress. Otherwise false is returned.
-   * @throws KeeperException
-   */
-  public boolean hasMajorCompactionChildren(String tableName, String familyName)
-      throws KeeperException {
-    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
-    String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
-    List<String> children = ZKUtil.listChildrenNoWatch(zkw, mcPath);
-    return children != null && !children.isEmpty();
-  }
-
-  /**
-   * Creates a node of a major compaction to the Zookeeper.
-   * Before a HBase major compaction, such a node is created to the Zookeeper. It tells others that
-   * there're major compaction in progress, the sweep tool could not be run at this time.
-   * @param tableName The current table name.
-   * @param familyName The current column family name.
-   * @param compactionName The current compaction name.
-   * @return True if the node is created successfully. Otherwise false is returned.
-   * @throws KeeperException
-   */
-  public boolean addMajorCompactionZNode(String tableName, String familyName,
-      String compactionName) throws KeeperException {
-    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
-    String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
-    ZKUtil.createNodeIfNotExistsAndWatch(zkw, mcPath, null);
-    String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
-    return ZKUtil.createEphemeralNodeAndWatch(zkw, eachMcPath, null);
-  }
-
-  /**
-   * Deletes a major compaction node from the Zookeeper.
-   * @param tableName The current table name.
-   * @param familyName The current column family name.
-   * @param compactionName The current compaction name.
-   * @throws KeeperException
-   */
-  public void deleteMajorCompactionZNode(String tableName, String familyName,
-      String compactionName) throws KeeperException {
-    String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
-    String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
-    String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
-    ZKUtil.deleteNode(zkw, eachMcPath);
-  }
-
-  /**
-   * Closes the MobZookeeper.
-   */
-  public void close() {
-    this.zkw.close();
-  }
-
-  /**
-   * An dummy abortable. It's used for the MobZookeeper.
-   */
-  public static class DummyMobAbortable implements Abortable {
-
-    private boolean abort = false;
-
-    public void abort(String why, Throwable e) {
-      abort = true;
-    }
-
-    public boolean isAborted() {
-      return abort;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
index 8caa3b0..1c8bad7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mob.mapreduce;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -37,32 +38,39 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.mob.MobZookeeper;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.serializer.JavaSerialization;
 import org.apache.hadoop.io.serializer.WritableSerialization;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.zookeeper.KeeperException;
@@ -77,21 +85,23 @@ public class SweepJob {
   private final FileSystem fs;
   private final Configuration conf;
   private static final Log LOG = LogFactory.getLog(SweepJob.class);
-  static final String SWEEP_JOB_ID = "mob.compaction.id";
-  static final String SWEEPER_NODE = "mob.compaction.sweep.node";
-  static final String WORKING_DIR_KEY = "mob.compaction.dir";
-  static final String WORKING_ALLNAMES_FILE_KEY = "mob.compaction.all.file";
-  static final String WORKING_VISITED_DIR_KEY = "mob.compaction.visited.dir";
+  static final String SWEEP_JOB_ID = "mob.sweep.job.id";
+  static final String SWEEP_JOB_SERVERNAME = "mob.sweep.job.servername";
+  static final String SWEEP_JOB_TABLE_NODE = "mob.sweep.job.table.node";
+  static final String WORKING_DIR_KEY = "mob.sweep.job.dir";
+  static final String WORKING_ALLNAMES_FILE_KEY = "mob.sweep.job.all.file";
+  static final String WORKING_VISITED_DIR_KEY = "mob.sweep.job.visited.dir";
   static final String WORKING_ALLNAMES_DIR = "all";
   static final String WORKING_VISITED_DIR = "visited";
-  public static final String WORKING_FILES_DIR_KEY = "mob.compaction.files.dir";
-  //the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing.
-  public static final String MOB_COMPACTION_DELAY = "hbase.mob.compaction.delay";
+  public static final String WORKING_FILES_DIR_KEY = "mob.sweep.job.files.dir";
+  //the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing.
+  public static final String MOB_SWEEP_JOB_DELAY = "hbase.mob.sweep.job.delay";
   protected static long ONE_DAY = 24 * 60 * 60 * 1000;
   private long compactionStartTime = EnvironmentEdgeManager.currentTime();
   public final static String CREDENTIALS_LOCATION = "credentials_location";
   private CacheConfig cacheConfig;
   static final int SCAN_CACHING = 10000;
+  private TableLockManager tableLockManager;
 
   public SweepJob(Configuration conf, FileSystem fs) {
     this.conf = conf;
@@ -102,6 +112,22 @@ public class SweepJob {
     cacheConfig = new CacheConfig(copyOfConf);
   }
 
+  static ServerName getCurrentServerName(Configuration conf) throws IOException {
+    String hostname = conf.get(
+        "hbase.regionserver.ipc.address",
+        Strings.domainNamePointerToHostName(DNS.getDefaultHost(
+            conf.get("hbase.regionserver.dns.interface", "default"),
+            conf.get("hbase.regionserver.dns.nameserver", "default"))));
+    int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
+    // Creation of a HSA will force a resolve.
+    InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
+    if (initialIsa.getAddress() == null) {
+      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
+    }
+    return ServerName.valueOf(initialIsa.getHostName(), initialIsa.getPort(),
+        EnvironmentEdgeManager.currentTime());
+  }
+
   /**
    * Runs MapReduce to do the sweeping on the mob files.
    * There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob
@@ -141,37 +167,21 @@ public class SweepJob {
     }
     String familyName = family.getNameAsString();
     String id = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
-    MobZookeeper zk = MobZookeeper.newInstance(conf, id);
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, id, new DummyMobAbortable());
     try {
-      // Try to obtain the lock. Use this lock to synchronize all the query, creation/deletion
-      // in the Zookeeper.
-      if (!zk.lockColumnFamily(tn.getNameAsString(), familyName)) {
-        LOG.warn("Can not lock the store " + familyName
-            + ". The major compaction in HBase may be in-progress. Please re-run the job.");
-        return 3;
-      }
+      ServerName serverName = getCurrentServerName(conf);
+      tableLockManager = TableLockManager.createTableLockManager(conf, zkw, serverName);
+      TableName lockName = MobUtils.getTableLockName(tn);
+      TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
+      String tableName = tn.getNameAsString();
+      // Try to obtain the lock. Use this lock to synchronize all the query
       try {
-        // Checks whether there're HBase major compaction now.
-        boolean hasChildren = zk.hasMajorCompactionChildren(tn.getNameAsString(), familyName);
-        if (hasChildren) {
-          LOG.warn("The major compaction in HBase may be in-progress."
-              + " Please re-run the job.");
-          return 4;
-        } else {
-          // Checks whether there's sweep tool in progress.
-          boolean hasSweeper = zk.isSweeperZNodeExist(tn.getNameAsString(), familyName);
-          if (hasSweeper) {
-            LOG.warn("Another sweep job is running");
-            return 5;
-          } else {
-            // add the sweeper node, mark that there's one sweep tool in progress.
-            // All the HBase major compaction and sweep tool in this column family could not
-            // run until this sweep tool is finished.
-            zk.addSweeperZNode(tn.getNameAsString(), familyName, Bytes.toBytes(id));
-          }
-        }
-      } finally {
-        zk.unlockColumnFamily(tn.getNameAsString(), familyName);
+        lock.acquire();
+      } catch (Exception e) {
+        LOG.warn("Can not lock the table " + tableName
+            + ". The major compaction in HBase may be in-progress or another sweep job is running."
+            + " Please re-run the job.");
+        return 3;
       }
       Job job = null;
       try {
@@ -186,7 +196,9 @@ public class SweepJob {
         conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
             JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
         conf.set(SWEEP_JOB_ID, id);
-        conf.set(SWEEPER_NODE, zk.getSweeperZNodePath(tn.getNameAsString(), familyName));
+        conf.set(SWEEP_JOB_SERVERNAME, serverName.toString());
+        String tableLockNode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
+        conf.set(SWEEP_JOB_TABLE_NODE, tableLockNode);
         job = prepareJob(tn, familyName, scan, conf);
         job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName);
         // Record the compaction start time.
@@ -204,14 +216,21 @@ public class SweepJob {
           removeUnusedFiles(job, tn, family);
         } else {
           System.err.println("Job Failed");
-          return 6;
+          return 4;
         }
       } finally {
-        cleanup(job, tn, familyName);
-        zk.deleteSweeperZNode(tn.getNameAsString(), familyName);
+        try {
+          cleanup(job, tn, familyName);
+        } finally {
+          try {
+            lock.release();
+          } catch (IOException e) {
+            LOG.error("Fail to release the table lock " + tableName, e);
+          }
+        }
       }
     } finally {
-      zk.close();
+      zkw.close();
     }
     return 0;
   }
@@ -305,7 +324,7 @@ public class SweepJob {
     // archive them.
     FileStatus[] files = fs.listStatus(mobStorePath);
     Set<String> fileNames = new TreeSet<String>();
-    long mobCompactionDelay = job.getConfiguration().getLong(MOB_COMPACTION_DELAY, ONE_DAY);
+    long mobCompactionDelay = job.getConfiguration().getLong(MOB_SWEEP_JOB_DELAY, ONE_DAY);
     for (FileStatus fileStatus : files) {
       if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) {
         if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) {
@@ -422,9 +441,8 @@ public class SweepJob {
    * Deletes the working directory.
    * @param job The current job.
    * @param familyName The family to cleanup
-   * @throws IOException
    */
-  private void cleanup(Job job, TableName tn, String familyName) throws IOException {
+  private void cleanup(Job job, TableName tn, String familyName) {
     if (job != null) {
       // delete the working directory
       Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY));
@@ -563,4 +581,18 @@ public class SweepJob {
      */
     RECORDS_UPDATED,
   }
+
+  public static class DummyMobAbortable implements Abortable {
+
+    private boolean abort = false;
+
+    public void abort(String why, Throwable e) {
+      abort = true;
+    }
+
+    public boolean isAborted() {
+      return abort;
+    }
+
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
index b789332..7844359 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java
@@ -18,8 +18,14 @@
  */
 package org.apache.hadoop.hbase.mob.mapreduce;
 
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -36,38 +42,58 @@ import org.apache.zookeeper.KeeperException;
 @InterfaceAudience.Private
 public class SweepJobNodeTracker extends ZooKeeperListener {
 
-  private String node;
-  private String sweepJobId;
+  private String parentNode;
+  private String lockNodePrefix;
+  private String owner;
+  private String lockNode;
 
-  public SweepJobNodeTracker(ZooKeeperWatcher watcher, String node, String sweepJobId) {
+  public SweepJobNodeTracker(ZooKeeperWatcher watcher, String parentNode, String owner) {
     super(watcher);
-    this.node = node;
-    this.sweepJobId = sweepJobId;
+    this.parentNode = parentNode;
+    this.owner = owner;
+    this.lockNodePrefix = ZKUtil.joinZNode(parentNode, "write-");
   }
 
   /**
    * Registers the watcher on the sweep job node.
    * If there's no such a sweep job node, or it's not created by the sweep job that
    * owns the current MR, the current process will be aborted.
+   * This assumes the table lock uses the Zookeeper. It's a workaround and only used
+   * in the sweep tool, and the sweep tool will be removed after the mob file compaction
+   * is finished.
    */
   public void start() throws KeeperException {
     watcher.registerListener(this);
-    if (ZKUtil.watchAndCheckExists(watcher, node)) {
-      byte[] data = ZKUtil.getDataAndWatch(watcher, node);
-      if (data != null) {
-        if (!sweepJobId.equals(Bytes.toString(data))) {
-          System.exit(1);
+    List<String> children = ZKUtil.listChildrenNoWatch(watcher, parentNode);
+    if (children != null && !children.isEmpty()) {
+      // there are locks
+      TreeSet<String> sortedChildren = new TreeSet<String>();
+      sortedChildren.addAll(children);
+      // find all the write locks
+      SortedSet<String> tails = sortedChildren.tailSet(lockNodePrefix);
+      if (!tails.isEmpty()) {
+        for (String tail : tails) {
+          String path = ZKUtil.joinZNode(parentNode, tail);
+          byte[] data = ZKUtil.getDataAndWatch(watcher, path);
+          TableLock lock = TableLockManager.fromBytes(data);
+          ServerName serverName = lock.getLockOwner();
+          org.apache.hadoop.hbase.ServerName sn = org.apache.hadoop.hbase.ServerName.valueOf(
+              serverName.getHostName(), serverName.getPort(), serverName.getStartCode());
+          // compare the server names (host, port and start code), make sure the lock is created
+          if (owner.equals(sn.toString())) {
+            lockNode = path;
+            return;
+          }
         }
       }
-    } else {
-      System.exit(1);
     }
+    System.exit(1);
   }
 
   @Override
   public void nodeDeleted(String path) {
-    // If the ephemeral node is deleted, abort the current process.
-    if (node.equals(path)) {
+    // If the lock node is deleted, abort the current process.
+    if (path.equals(lockNode)) {
       System.exit(1);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
index f508b93..56e5726 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
@@ -45,11 +45,12 @@ public class SweepMapper extends TableMapper<Text, KeyValue> {
   protected void setup(Context context) throws IOException,
       InterruptedException {
     String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
-    String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
+    String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
+    String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
     zkw = new ZooKeeperWatcher(context.getConfiguration(), id,
         new DummyMobAbortable());
     try {
-      SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, id);
+      SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
       tracker.start();
     } catch (KeeperException e) {
       throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
index 9fd5750..73ca1a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobFile;
 import org.apache.hadoop.hbase.mob.MobFileName;
 import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
 import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
@@ -102,8 +102,8 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
   protected void setup(Context context) throws IOException, InterruptedException {
     this.conf = context.getConfiguration();
     this.fs = FileSystem.get(conf);
-    // the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing.
-    mobCompactionDelay = conf.getLong(SweepJob.MOB_COMPACTION_DELAY, SweepJob.ONE_DAY);
+    // the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing.
+    mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY);
     String tableName = conf.get(TableInputFormat.INPUT_TABLE);
     String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
     TableName tn = TableName.valueOf(tableName);
@@ -125,7 +125,7 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
     }
     // disable the block cache.
     Configuration copyOfConf = new Configuration(conf);
-    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.00001f);
+    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
     this.cacheConfig = new CacheConfig(copyOfConf);
 
     table = new HTable(this.conf, Bytes.toBytes(tableName));
@@ -148,12 +148,13 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
   @Override
   public void run(Context context) throws IOException, InterruptedException {
     String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
-    String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
+    String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
+    String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
         new DummyMobAbortable());
     FSDataOutputStream fout = null;
     try {
-      SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, jobId);
+      SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
       tracker.start();
       setup(context);
       // create a sequence contains all the visited file names in this reducer.

http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 218a4ef..3c8fa87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -46,17 +46,17 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 import org.apache.hadoop.hbase.mob.MobCacheConfig;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobFile;
 import org.apache.hadoop.hbase.mob.MobFileName;
 import org.apache.hadoop.hbase.mob.MobStoreEngine;
 import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.mob.MobZookeeper;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * The store implementation to save MOBs (medium objects), it extends the HStore.
@@ -91,6 +91,8 @@ public class HMobStore extends HStore {
   private volatile long mobScanCellsSize = 0;
   private List<Path> mobDirLocations;
   private HColumnDescriptor family;
+  private TableLockManager tableLockManager;
+  private TableName tableLockName;
 
   public HMobStore(final HRegion region, final HColumnDescriptor family,
       final Configuration confParam) throws IOException {
@@ -105,6 +107,10 @@ public class HMobStore extends HStore {
     TableName tn = region.getTableDesc().getTableName();
     mobDirLocations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
         .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
+    if (region.getRegionServerServices() != null) {
+      tableLockManager = region.getRegionServerServices().getTableLockManager();
+      tableLockName = MobUtils.getTableLockName(getTableName());
+    }
   }
 
   /**
@@ -425,59 +431,40 @@ public class HMobStore extends HStore {
       //             compaction as retainDeleteMarkers and continue the compaction.
       //      1.2.2. If the node is not there, add a child to the major compaction node, and
       //             run the compaction directly.
-      String compactionName = UUID.randomUUID().toString().replaceAll("-", "");
-      MobZookeeper zk = null;
-      try {
-        zk = MobZookeeper.newInstance(region.getBaseConf(), compactionName);
-      } catch (KeeperException e) {
-        LOG.error("Cannot connect to the zookeeper, forcing the delete markers to be retained", e);
-        compaction.getRequest().forceRetainDeleteMarkers();
-        return super.compact(compaction);
+      TableLock lock = null;
+      if (tableLockManager != null) {
+        lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore");
+      }
+      boolean tableLocked = false;
+      String tableName = getTableName().getNameAsString();
+      if (lock != null) {
+        try {
+          LOG.info("Start to acquire a read lock for the table[" + tableName
+              + "], ready to perform the major compaction");
+          lock.acquire();
+          tableLocked = true;
+        } catch (Exception e) {
+          LOG.error("Fail to lock the table " + tableName, e);
+        }
+      } else {
+        // If the tableLockManager is null, mark the tableLocked as true.
+        tableLocked = true;
       }
-      boolean keepDeleteMarkers = true;
-      boolean majorCompactNodeAdded = false;
       try {
-        // try to acquire the operation lock.
-        if (zk.lockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString())) {
-          try {
-            LOG.info("Obtain the lock for the store[" + this
-                + "], ready to perform the major compaction");
-            // check the sweeping node to find out whether the sweeping is in progress.
-            boolean hasSweeper = zk.isSweeperZNodeExist(getTableName().getNameAsString(),
-                getFamily().getNameAsString());
-            if (!hasSweeper) {
-              // if not, add a child to the major compaction node of this store.
-              majorCompactNodeAdded = zk.addMajorCompactionZNode(getTableName().getNameAsString(),
-                  getFamily().getNameAsString(), compactionName);
-              // If we failed to add the major compact node, go with keep delete markers mode.
-              keepDeleteMarkers = !majorCompactNodeAdded;
-            }
-          } catch (Exception e) {
-            LOG.error("Fail to handle the Zookeeper", e);
-          } finally {
-            // release the operation lock
-            zk.unlockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString());
-          }
+        if (!tableLocked) {
+          LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table["
+              + tableName + "], forcing the delete markers to be retained");
+          compaction.getRequest().forceRetainDeleteMarkers();
         }
-        try {
-          if (keepDeleteMarkers) {
-            LOG.warn("Cannot obtain the lock or a sweep tool is running on this store[" + this
-                + "], forcing the delete markers to be retained");
-            compaction.getRequest().forceRetainDeleteMarkers();
-          }
-          return super.compact(compaction);
-        } finally {
-          if (majorCompactNodeAdded) {
-            try {
-              zk.deleteMajorCompactionZNode(getTableName().getNameAsString(), getFamily()
-                  .getNameAsString(), compactionName);
-            } catch (KeeperException e) {
-              LOG.error("Fail to delete the compaction znode" + compactionName, e);
-            }
+        return super.compact(compaction);
+      } finally {
+        if (tableLocked && lock != null) {
+          try {
+            lock.release();
+          } catch (IOException e) {
+            LOG.error("Fail to release the table lock " + tableName, e);
           }
         }
-      } finally {
-        zk.close();
       }
     } else {
       // If it's not a major compaction, continue the compaction.

http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
index a7e2538..2aa3a4a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java
@@ -17,15 +17,27 @@
  */
 package org.apache.hadoop.hbase.mob.mapreduce;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mob.MobZookeeper;
-import org.apache.hadoop.hbase.mob.mapreduce.SweepMapper;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.junit.AfterClass;
@@ -35,9 +47,6 @@ import org.junit.experimental.categories.Category;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
-
 @Category(SmallTests.class)
 public class TestMobSweepMapper {
 
@@ -71,30 +80,41 @@ public class TestMobSweepMapper {
     when(columns.raw()).thenReturn(kvList);
 
     Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
+    TableName tn = TableName.valueOf("testSweepMapper");
+    TableName lockName = MobUtils.getTableLockName(tn);
+    String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
     configuration.set(SweepJob.SWEEP_JOB_ID, "1");
-    configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepMapper:family-sweeper");
-
-    MobZookeeper zk = MobZookeeper.newInstance(configuration, "1");
-    zk.addSweeperZNode("testSweepMapper", "family", Bytes.toBytes("1"));
-
-    Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
-            mock(Mapper.Context.class);
-    when(ctx.getConfiguration()).thenReturn(configuration);
-    SweepMapper map = new SweepMapper();
-    doAnswer(new Answer<Void>() {
-
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        Text text = (Text) invocation.getArguments()[0];
-        KeyValue kv = (KeyValue) invocation.getArguments()[1];
-
-        assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName);
-        assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey()));
-
-        return null;
-      }
-    }).when(ctx).write(any(Text.class), any(KeyValue.class));
-
-    map.map(r, columns, ctx);
+    configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
+    ServerName serverName = SweepJob.getCurrentServerName(configuration);
+    configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
+
+    TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
+        serverName);
+    TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
+    lock.acquire();
+    try {
+      Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
+        mock(Mapper.Context.class);
+      when(ctx.getConfiguration()).thenReturn(configuration);
+      SweepMapper map = new SweepMapper();
+      doAnswer(new Answer<Void>() {
+
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          Text text = (Text) invocation.getArguments()[0];
+          KeyValue kv = (KeyValue) invocation.getArguments()[1];
+
+          assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName);
+          assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey()));
+
+          return null;
+        }
+      }).when(ctx).write(any(Text.class), any(KeyValue.class));
+
+      map.map(r, columns, ctx);
+    } finally {
+      lock.release();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
index a45ed34..1a69d06 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java
@@ -36,16 +36,21 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.mob.MobZookeeper;
+import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
 import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -126,11 +131,11 @@ public class TestMobSweepReducer {
   @Test
   public void testRun() throws Exception {
 
+    TableName tn = TableName.valueOf(tableName);
     byte[] mobValueBytes = new byte[100];
 
     //get the path where mob files lie in
-    Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(),
-    TableName.valueOf(tableName), family);
+    Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family);
 
     Put put = new Put(Bytes.toBytes(row));
     put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
@@ -139,7 +144,7 @@ public class TestMobSweepReducer {
     table.put(put);
     table.put(put2);
     table.flushCommits();
-    admin.flush(TableName.valueOf(tableName));
+    admin.flush(tn);
 
     FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
     //check the generation of a mob file
@@ -159,34 +164,42 @@ public class TestMobSweepReducer {
     configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
         System.currentTimeMillis() + 24 * 3600 * 1000);
 
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
+    TableName lockName = MobUtils.getTableLockName(tn);
+    String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
     configuration.set(SweepJob.SWEEP_JOB_ID, "1");
-    configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepReducer:family-sweeper");
-
-    MobZookeeper zk = MobZookeeper.newInstance(configuration, "1");
-    zk.addSweeperZNode(tableName, family, Bytes.toBytes("1"));
-
-    //use the same counter when mocking
-    Counter counter = new GenericCounter();
-    Reducer<Text, KeyValue, Writable, Writable>.Context ctx =
-            mock(Reducer.Context.class);
-    when(ctx.getConfiguration()).thenReturn(configuration);
-    when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
-    when(ctx.nextKey()).thenReturn(true).thenReturn(false);
-    when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
-
-    byte[] refBytes = Bytes.toBytes(mobFile1);
-    long valueLength = refBytes.length;
-    byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
-    KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family),
-            Bytes.toBytes(qf), 1, KeyValue.Type.Put, newValue);
-    List<KeyValue> list = new ArrayList<KeyValue>();
-    list.add(kv2);
-
-    when(ctx.getValues()).thenReturn(list);
-
-    SweepReducer reducer = new SweepReducer();
-    reducer.run(ctx);
-
+    configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
+    ServerName serverName = SweepJob.getCurrentServerName(configuration);
+    configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
+
+    TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
+        serverName);
+    TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
+    lock.acquire();
+    try {
+      // use the same counter when mocking
+      Counter counter = new GenericCounter();
+      Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class);
+      when(ctx.getConfiguration()).thenReturn(configuration);
+      when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
+      when(ctx.nextKey()).thenReturn(true).thenReturn(false);
+      when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
+
+      byte[] refBytes = Bytes.toBytes(mobFile1);
+      long valueLength = refBytes.length;
+      byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
+      KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1,
+        KeyValue.Type.Put, newValue);
+      List<KeyValue> list = new ArrayList<KeyValue>();
+      list.add(kv2);
+
+      when(ctx.getValues()).thenReturn(list);
+
+      SweepReducer reducer = new SweepReducer();
+      reducer.run(ctx);
+    } finally {
+      lock.release();
+    }
     FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
     String mobFile2 = filsStatuses2[0].getPath().getName();
     //new mob file is generated, old one has been archived
@@ -194,7 +207,7 @@ public class TestMobSweepReducer {
     assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
 
     //test sequence file
-    String workingPath = configuration.get("mob.compaction.visited.dir");
+    String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY);
     FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath));
     Set<String> files = new TreeSet<String>();
     for (FileStatus st : statuses) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
index 2021bd8..c4817aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweeper.java
@@ -180,7 +180,7 @@ public class TestMobSweeper {
 
 
     Configuration conf = TEST_UTIL.getConfiguration();
-    conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 24 * 60 * 60 * 1000);
+    conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 24 * 60 * 60 * 1000);
 
     String[] args = new String[2];
     args[0] = tableName;
@@ -260,7 +260,7 @@ public class TestMobSweeper {
 
 
     Configuration conf = TEST_UTIL.getConfiguration();
-    conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 0);
+    conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 0);
 
     String[] args = new String[2];
     args[0] = tableName;

http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbb3249/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
index fb85e87..2d68cd1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
@@ -224,62 +224,6 @@ public class TestMobCompaction {
         countMobCellsInMetadata());
   }
 
-  /**
-   * Tests the major compaction when the zk is not connected.
-   * After that the major compaction will be marked as retainDeleteMarkers, the delete marks
-   * will be retained.
-   * @throws Exception
-   */
-  @Test
-  public void testMajorCompactionWithZKError() throws Exception {
-    Configuration conf = new Configuration(UTIL.getConfiguration());
-    // use the wrong zk settings
-    conf.setInt("zookeeper.recovery.retry", 0);
-    conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 100);
-    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT,
-        conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181) - 1);
-    init(conf, 200);
-    byte[] dummyData = makeDummyData(300); // larger than mob threshold
-    HRegionIncommon loader = new HRegionIncommon(region);
-    byte[] deleteRow = Bytes.toBytes(0);
-    for (int i = 0; i < compactionThreshold - 1 ; i++) {
-      Put p = new Put(Bytes.toBytes(i));
-      p.setDurability(Durability.SKIP_WAL);
-      p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
-      loader.put(p);
-      loader.flushcache();
-    }
-    Delete delete = new Delete(deleteRow);
-    delete.deleteFamily(COLUMN_FAMILY);
-    region.delete(delete);
-    loader.flushcache();
-
-    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
-    region.compactStores(true);
-    assertEquals("After compaction: store files", 1, countStoreFiles());
-
-    Scan scan = new Scan();
-    scan.setRaw(true);
-    InternalScanner scanner = region.getScanner(scan);
-    List<Cell> results = new ArrayList<Cell>();
-    scanner.next(results);
-    int deleteCount = 0;
-    while (!results.isEmpty()) {
-      for (Cell c : results) {
-        if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
-          deleteCount++;
-          assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow));
-        }
-      }
-      results.clear();
-      scanner.next(results);
-    }
-    // assert the delete mark is retained, the major compaction is marked as
-    // retainDeleteMarkers.
-    assertEquals(1, deleteCount);
-    scanner.close();
-  }
-
   @Test
   public void testMajorCompactionAfterDelete() throws Exception {
     init(UTIL.getConfiguration(), 100);


Mime
View raw message