hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From niuyu...@apache.org
Subject [hbase] branch HBASE-25714 updated: HBASE-25830 HBaseCluster support CompactionServer for UTs (#3266)
Date Tue, 01 Jun 2021 02:25:51 GMT
This is an automated email from the ASF dual-hosted git repository.

niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-25714 by this push:
     new 997af02  HBASE-25830 HBaseCluster support CompactionServer for UTs (#3266)
997af02 is described below

commit 997af0215c50d4be3eb775838a1bb9bc246b0b4d
Author: niuyulin <yulin.niu.2016@gmail.com>
AuthorDate: Tue Jun 1 10:25:05 2021 +0800

    HBASE-25830 HBaseCluster support CompactionServer for UTs (#3266)
    
    Signed-off-by: zhangduo <zhangduo@apache.org>
---
 .../testclassification/CompactionServerTests.java  |  21 ++++
 .../org/apache/hadoop/hbase/AbstractServer.java    |  31 ++++++
 .../org/apache/hadoop/hbase/LocalHBaseCluster.java |  56 ++++++++++-
 .../hbase/compactionserver/HCompactionServer.java  |  21 +++-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  26 +----
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   4 +-
 .../apache/hadoop/hbase/util/JVMClusterUtil.java   |  57 ++++++++++-
 .../apache/hadoop/hbase/HBaseTestingUtility.java   |   9 +-
 .../org/apache/hadoop/hbase/MiniHBaseCluster.java  |  86 +++++++++++++---
 .../hadoop/hbase/StartMiniClusterOption.java       |  34 +++++--
 .../compactionserver/TestCompactionServer.java     | 108 +++++++++++++++++++++
 pom.xml                                            |  16 +++
 12 files changed, 410 insertions(+), 59 deletions(-)

diff --git a/hbase-annotations/src/test/java/org/apache/hadoop/hbase/testclassification/CompactionServerTests.java
b/hbase-annotations/src/test/java/org/apache/hadoop/hbase/testclassification/CompactionServerTests.java
new file mode 100644
index 0000000..110223b
--- /dev/null
+++ b/hbase-annotations/src/test/java/org/apache/hadoop/hbase/testclassification/CompactionServerTests.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.testclassification;
+
+public interface CompactionServerTests {
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
index 1fa68e6..208bd86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
@@ -98,6 +99,8 @@ public abstract class AbstractServer extends Thread implements Server {
   // of AbstractServer in isolation.
   protected volatile boolean stopped = false;
 
+  // flag set after we're done setting up server threads
+  protected final AtomicBoolean online = new AtomicBoolean(false);
   // master address tracker
   protected MasterAddressTracker masterAddressTracker;
   // Cluster Status Tracker
@@ -115,6 +118,10 @@ public abstract class AbstractServer extends Thread implements Server
{
     }
   }
 
+  public AtomicBoolean getOnline() {
+    return online;
+  }
+
   @Override
   public ServerName getServerName() {
     return this.serverName;
@@ -369,6 +376,30 @@ public abstract class AbstractServer extends Thread implements Server
{
     return this.stopped;
   }
 
+  /**
+   * Report the status of the server. A server is online once all the startup is
+   * completed (setting up filesystem, starting executorService threads, etc.). This
+   * method is designed mostly to be useful in tests.
+   *
+   * @return true if online, false if not.
+   */
+  public boolean isOnline() {
+    return online.get();
+  }
+
+  public void waitForServerOnline(){
+    while (!isStopped() && !isOnline()) {
+      synchronized (online) {
+        try {
+          online.wait(msgInterval);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+    }
+  }
+
   protected abstract AbstractRpcServices getRpcService();
 
   protected abstract String getProcessName();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
index f4847b9..ec55182 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.compactionserver.HCompactionServer;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.security.User;
@@ -61,6 +62,8 @@ public class LocalHBaseCluster {
   private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class);
   private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>();
   private final List<JVMClusterUtil.RegionServerThread> regionThreads = new CopyOnWriteArrayList<>();
+  private final List<JVMClusterUtil.CompactionServerThread> compactionServerThreads
=
+      new CopyOnWriteArrayList<>();
   private final static int DEFAULT_NO = 1;
   /** local mode */
   public static final String LOCAL = "local";
@@ -426,11 +429,62 @@ public class LocalHBaseCluster {
     }
   }
 
+  @SuppressWarnings("unchecked")
+  public JVMClusterUtil.CompactionServerThread addCompactionServer(
+    Configuration config, final int index) throws IOException {
+    // Create each compaction server with its own Configuration instance so each has
+    // its Connection instance rather than share (see HBASE_INSTANCES down in
+    // the guts of ConnectionManager).
+    JVMClusterUtil.CompactionServerThread cst =
+      JVMClusterUtil.createCompactionServerThread(config, index);
+    this.compactionServerThreads.add(cst);
+    return cst;
+  }
+
+  JVMClusterUtil.CompactionServerThread addCompactionServer(final Configuration config,
+      final int index, User user) throws IOException, InterruptedException {
+    return user.runAs(
+      (PrivilegedExceptionAction<JVMClusterUtil.CompactionServerThread>) () -> addCompactionServer(
+        config, index));
+  }
+
+  /**
+   * @param serverNumber compaction server number
+   * @return compaction server
+   */
+  HCompactionServer getCompactionServer(int serverNumber) {
+    return compactionServerThreads.get(serverNumber).getCompactionServer();
+  }
+
+  /**
+   * @return Read-only list of compaction server threads.
+   */
+  List<JVMClusterUtil.CompactionServerThread> getCompactionServers() {
+    return Collections.unmodifiableList(this.compactionServerThreads);
+  }
+
+  /**
+   * @return List of running servers (Some servers may have been killed or aborted during
lifetime
+   *         of cluster; these servers are not included in this list).
+   */
+  List<JVMClusterUtil.CompactionServerThread> getLiveCompactionServers() {
+    List<JVMClusterUtil.CompactionServerThread> liveServers = new ArrayList<>();
+    List<JVMClusterUtil.CompactionServerThread> list = getCompactionServers();
+    for (JVMClusterUtil.CompactionServerThread cst : list) {
+      if (cst.isAlive()) {
+        liveServers.add(cst);
+      } else {
+        LOG.info("Not alive {}", cst.getName());
+      }
+    }
+    return liveServers;
+  }
+
   /**
    * Start the cluster.
    */
   public void startup() throws IOException {
-    JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
+    JVMClusterUtil.startup(this.masterThreads, this.regionThreads, this.compactionServerThreads);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
index 6a689f9..88a3176 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Sleeper;
 import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -105,8 +106,11 @@ public class HCompactionServer extends AbstractServer {
     if (!this.masterless) {
       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
       masterAddressTracker.start();
+      clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
+      clusterStatusTracker.start();
     } else {
       masterAddressTracker = null;
+      clusterStatusTracker = null;
     }
 
     ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
@@ -174,13 +178,28 @@ public class HCompactionServer extends AbstractServer {
       abort("Fatal exception during initialization", e);
     }
     try {
+      if (!isStopped() && !isAborted()) {
+        while (keepLooping()) {
+          createCompactionServerStatusStub();
+          if (cssStub == null) {
+            this.sleeper.sleep(100);
+          } else {
+            break;
+          }
+        }
+      }
       // We registered with the Master. Go into run mode.
       long lastMsg = System.currentTimeMillis();
       // The main run loop.
       while (!isStopped()) {
         long now = System.currentTimeMillis();
         if ((now - lastMsg) >= msgInterval) {
-          tryCompactionServerReport();
+          if (tryCompactionServerReport() && !online.get()) {
+            synchronized (online) {
+              online.set(true);
+              online.notifyAll();
+            }
+          }
           lastMsg = System.currentTimeMillis();
         }
         if (!isStopped()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index ce36b7d..ca2e1df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -396,9 +396,6 @@ public class HRegionServer extends AbstractServer implements
   // A thread which calls reportProcedureDone
   private RemoteProcedureResultReporter procedureResultReporter;
 
-  // flag set after we're done setting up server threads
-  final AtomicBoolean online = new AtomicBoolean(false);
-
   // Log Splitting Worker
   private SplitLogWorker splitLogWorker;
 
@@ -1795,16 +1792,7 @@ public class HRegionServer extends AbstractServer implements
     }
   }
 
-  /**
-   * Report the status of the server. A server is online once all the startup is
-   * completed (setting up filesystem, starting executorService threads, etc.). This
-   * method is designed mostly to be useful in tests.
-   *
-   * @return true if online, false if not.
-   */
-  public boolean isOnline() {
-    return online.get();
-  }
+
 
   /**
    * Setup WAL log and replication if enabled. Replication setup is done in here because
it wants to
@@ -2228,18 +2216,6 @@ public class HRegionServer extends AbstractServer implements
     }
   }
 
-  public void waitForServerOnline(){
-    while (!isStopped() && !isOnline()) {
-      synchronized (online) {
-        try {
-          online.wait(msgInterval);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-          break;
-        }
-      }
-    }
-  }
 
   @Override
   public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException
{
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 924fbfd..bfb25ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1929,11 +1929,11 @@ public class RSRpcServices extends AbstractRpcServices implements
       int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
         HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
       long endTime = System.currentTimeMillis() + timeout;
-      synchronized (regionServer.online) {
+      synchronized (regionServer.getOnline()) {
         try {
           while (System.currentTimeMillis() <= endTime
               && !regionServer.isStopped() && !regionServer.isOnline()) {
-            regionServer.online.wait(regionServer.getMsgInterval());
+            regionServer.getOnline().wait(regionServer.getMsgInterval());
           }
           checkOpen();
         } catch (InterruptedException t) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
index cc0f49a..57a74a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
@@ -31,6 +31,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.compactionserver.HCompactionServer;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@@ -98,6 +99,53 @@ public class JVMClusterUtil {
     return new JVMClusterUtil.RegionServerThread(server, index);
   }
 
+  /**
+   * Datastructure to hold CompactionServer Thread and CompactionServer instance
+   */
+  public static class CompactionServerThread extends Thread {
+    private final HCompactionServer compactionServer;
+
+    public CompactionServerThread(final HCompactionServer cs, final int index) {
+      super(cs, "CS:" + index + ";" + cs.getServerName().toShortString());
+      this.compactionServer = cs;
+    }
+
+    /** @return the compaction server */
+    public HCompactionServer getCompactionServer() {
+      return this.compactionServer;
+    }
+
+    /**
+     * Block until the compaction server has come online, indicating it is ready
+     * to be used.
+     */
+    public void waitForServerOnline() {
+      // The server is marked online after the init method completes inside of
+      // the HCS#run method.  HCS#init can fail for whatever region.  In those
+      // cases, we'll jump out of the run without setting online flag.  Check
+      // stopRequested so we don't wait here a flag that will never be flipped.
+      compactionServer.waitForServerOnline();
+    }
+  }
+
+  /**
+   * Creates a {@link CompactionServerThread}.
+   * Call 'start' on the returned thread to make it run.
+   * @param c Configuration to use.
+   * @param index Used distinguishing the object returned.
+   * @throws IOException
+   * @return Compaction server added.
+   */
+  public static JVMClusterUtil.CompactionServerThread createCompactionServerThread(
+    final Configuration c, final int index) throws IOException {
+    HCompactionServer server;
+    try {
+      server = new HCompactionServer(c);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return new JVMClusterUtil.CompactionServerThread(server, index);
+  }
 
   /**
    * Datastructure to hold Master Thread and Master instance
@@ -122,7 +170,6 @@ public class JVMClusterUtil {
    * @param c Configuration to use.
    * @param hmc Class to create.
    * @param index Used distinguishing the object returned.
-   * @throws IOException
    * @return Master added.
    */
   public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
@@ -165,7 +212,8 @@ public class JVMClusterUtil {
    * @return Address to use contacting primary master.
    */
   public static String startup(final List<JVMClusterUtil.MasterThread> masters,
-      final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException
{
+      final List<JVMClusterUtil.RegionServerThread> regionservers,
+      final List<JVMClusterUtil.CompactionServerThread> compactionservers) throws IOException
{
     // Implementation note: This method relies on timed sleeps in a loop. It's not great,
and
     // should probably be re-written to use actual synchronization objects, but it's ok for
now
 
@@ -193,6 +241,11 @@ public class JVMClusterUtil {
       }
     }
 
+    if (compactionservers != null) {
+      for (JVMClusterUtil.CompactionServerThread t: compactionservers) {
+        t.start();
+      }
+    }
     // Wait for an active master to be initialized (implies being master)
     //  with this, when we return the cluster is complete
     final int initTimeout = configuration != null ? Integer.parseInt(
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index a036f93..dcdcb19 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1115,7 +1115,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
     TraceUtil.initTracer(c);
     this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
       option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
-      option.getMasterClass(), option.getRsClass());
+      option.getNumCompactionServers(), option.getMasterClass(), option.getRsClass());
     // Populate the master address configuration from mini cluster configuration.
     conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
     // Don't leave here till we've done a successful scan of the hbase:meta
@@ -1238,10 +1238,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
   public void restartHBaseCluster(StartMiniClusterOption option)
       throws IOException, InterruptedException {
     closeConnection();
-    this.hbaseCluster =
-        new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
-            option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
-            option.getRsClass());
+    this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(),
+        option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
+        option.getNumCompactionServers(), option.getMasterClass(), option.getRsClass());
     // Don't leave here till we've done a successful scan of the hbase:meta
     Connection conn = ConnectionFactory.createConnection(this.conf);
     Table t = conn.getTable(TableName.META_TABLE_NAME);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index ed141e7..fc36ac8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.compactionserver.HCompactionServer;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
@@ -86,31 +87,43 @@ public class MiniHBaseCluster extends HBaseCluster {
    * @param numRegionServers initial number of region servers to start.
    */
   public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
-         Class<? extends HMaster> masterClass,
-         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
+      Class<? extends HMaster> masterClass,
+      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
       throws IOException, InterruptedException {
-    this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
+    this(conf, numMasters, 0, numRegionServers, null, 0, masterClass, regionserverClass);
   }
 
   /**
    * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
-   *   restart where for sure the regionservers come up on same address+port (but
-   *   just with different startcode); by default mini hbase clusters choose new
-   *   arbitrary ports on each cluster start.
+   *          restart where for sure the regionservers come up on same address+port (but
just with
+   *          different startcode); by default mini hbase clusters choose new arbitrary ports
on
+   *          each cluster start.
+   */
+  public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
+      int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
+      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
+      throws IOException, InterruptedException {
+    this(conf, numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, 0, masterClass,
+        regionserverClass);
+  }
+
+  /**
+   * @param numCompactionServers initial number of compaction servers to start.
    * @throws IOException
    * @throws InterruptedException
    */
   public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
-         int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster>
masterClass,
-         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
+      int numRegionServers, List<Integer> rsPorts, int numCompactionServers,
+      Class<? extends HMaster> masterClass,
+      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
       throws IOException, InterruptedException {
     super(conf);
 
     // Hadoop 2
     CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
 
-    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
-        regionserverClass);
+    init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, numCompactionServers,
+      masterClass, regionserverClass);
     this.initialClusterStatus = getClusterMetrics();
   }
 
@@ -227,14 +240,15 @@ public class MiniHBaseCluster extends HBaseCluster {
   }
 
   private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
-      final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster>
masterClass,
+      final int nRegionNodes, List<Integer> rsPorts, int numCompactionServers,
+      Class<? extends HMaster> masterClass,
       Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
-  throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
     try {
-      if (masterClass == null){
-        masterClass =  HMaster.class;
+      if (masterClass == null) {
+        masterClass = HMaster.class;
       }
-      if (regionserverClass == null){
+      if (regionserverClass == null) {
         regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
       }
 
@@ -253,6 +267,12 @@ public class MiniHBaseCluster extends HBaseCluster {
         hbaseCluster.addRegionServer(rsConf, i, user);
       }
 
+      // manually add the compaction servers as other users
+      for (int i = 0; i < numCompactionServers; i++) {
+        Configuration csConf = HBaseConfiguration.create(conf);
+        User user = HBaseTestingUtility.getDifferentUser(csConf, ".hfs." + index++);
+        hbaseCluster.addCompactionServer(csConf, i, user);
+      }
       hbaseCluster.startup();
     } catch (IOException e) {
       shutdown();
@@ -911,6 +931,42 @@ public class MiniHBaseCluster extends HBaseCluster {
     }
   }
 
+  /**
+   * @return Number of live compaction servers in the cluster currently.
+   */
+  public int getNumLiveCompactionServers() {
+    return this.hbaseCluster.getLiveCompactionServers().size();
+  }
+
+  /**
+   * @return List of compaction server threads.
+   */
+  public List<JVMClusterUtil.CompactionServerThread> getCompactionServerThreads() {
+    return this.hbaseCluster.getCompactionServers();
+  }
+
+  /**
+   * @return List of live compaction server threads (skips the aborted and the killed)
+   */
+  public List<JVMClusterUtil.CompactionServerThread> getLiveCompactionServerThreads()
{
+    return this.hbaseCluster.getLiveCompactionServers();
+  }
+
+  /**
+   * Grab a numbered compaction server of your choice.
+   * @param serverNumber server number
+   * @return compaction server
+   */
+  public HCompactionServer getCompactionServer(int serverNumber) {
+    return hbaseCluster.getCompactionServer(serverNumber);
+  }
+
+  public HCompactionServer getCompactionServer(ServerName serverName) {
+    return hbaseCluster.getCompactionServers().stream()
+        .map(JVMClusterUtil.CompactionServerThread::getCompactionServer)
+        .filter(cs -> cs.getServerName().equals(serverName)).findFirst().orElse(null);
+  }
+
   @Override
   public void waitUntilShutDown() {
     this.hbaseCluster.join();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
index 7a9bd68..d7b8951 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java
@@ -77,6 +77,11 @@ public final class StartMiniClusterOption {
   private Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass;
 
   /**
+   * Number of compaction servers to start up.
+   */
+  private final int numCompactionServers;
+
+  /**
    * Number of datanodes. Used to create mini DSF cluster. Surpassed by {@link #dataNodeHosts}
size.
    */
   private final int numDataNodes;
@@ -109,14 +114,16 @@ public final class StartMiniClusterOption {
    */
   private StartMiniClusterOption(int numMasters, int numAlwaysStandByMasters,
       Class<? extends HMaster> masterClass, int numRegionServers, List<Integer>
rsPorts,
-      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, int numDataNodes,
-      String[] dataNodeHosts, int numZkServers, boolean createRootDir, boolean createWALDir)
{
+      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
+      int numCompactionServers, int numDataNodes, String[] dataNodeHosts, int numZkServers,
+      boolean createRootDir, boolean createWALDir) {
     this.numMasters = numMasters;
     this.numAlwaysStandByMasters = numAlwaysStandByMasters;
     this.masterClass = masterClass;
     this.numRegionServers = numRegionServers;
     this.rsPorts = rsPorts;
     this.rsClass = rsClass;
+    this.numCompactionServers = numCompactionServers;
     this.numDataNodes = numDataNodes;
     this.dataNodeHosts = dataNodeHosts;
     this.numZkServers = numZkServers;
@@ -148,6 +155,10 @@ public final class StartMiniClusterOption {
     return rsClass;
   }
 
+  public int getNumCompactionServers() {
+    return numCompactionServers;
+  }
+
   public int getNumDataNodes() {
     return numDataNodes;
   }
@@ -172,9 +183,10 @@ public final class StartMiniClusterOption {
   public String toString() {
     return "StartMiniClusterOption{" + "numMasters=" + numMasters + ", masterClass=" + masterClass
         + ", numRegionServers=" + numRegionServers + ", rsPorts=" + StringUtils.join(rsPorts)
-        + ", rsClass=" + rsClass + ", numDataNodes=" + numDataNodes
-        + ", dataNodeHosts=" + Arrays.toString(dataNodeHosts) + ", numZkServers=" + numZkServers
-        + ", createRootDir=" + createRootDir + ", createWALDir=" + createWALDir + '}';
+        + ", rsClass=" + rsClass + ", numCompactionServers=" + numCompactionServers
+        + ", numDataNodes=" + numDataNodes + ", dataNodeHosts=" + Arrays.toString(dataNodeHosts)
+        + ", numZkServers=" + numZkServers + ", createRootDir=" + createRootDir + ", createWALDir="
+        + createWALDir + '}';
   }
 
   /**
@@ -196,6 +208,7 @@ public final class StartMiniClusterOption {
     private Class<? extends HMaster> masterClass = null;
     private int numRegionServers = 1;
     private List<Integer> rsPorts = null;
+    private int numCompactionServers;
     private Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass
= null;
     private int numDataNodes = 1;
     private String[] dataNodeHosts = null;
@@ -210,9 +223,9 @@ public final class StartMiniClusterOption {
       if (dataNodeHosts != null && dataNodeHosts.length != 0) {
         numDataNodes = dataNodeHosts.length;
       }
-      return new StartMiniClusterOption(numMasters,numAlwaysStandByMasters, masterClass,
-          numRegionServers, rsPorts, rsClass, numDataNodes, dataNodeHosts, numZkServers,
-          createRootDir, createWALDir);
+      return new StartMiniClusterOption(numMasters, numAlwaysStandByMasters, masterClass,
+          numRegionServers, rsPorts, rsClass, numCompactionServers, numDataNodes, dataNodeHosts,
+          numZkServers, createRootDir, createWALDir);
     }
 
     public Builder numMasters(int numMasters) {
@@ -235,6 +248,11 @@ public final class StartMiniClusterOption {
       return this;
     }
 
+    public Builder numCompactionServers(int numCompactionServers) {
+      this.numCompactionServers = numCompactionServers;
+      return this;
+    }
+
     public Builder rsPorts(List<Integer> rsPorts) {
       this.rsPorts = rsPorts;
       return this;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
new file mode 100644
index 0000000..1edc414
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.compaction.CompactionOffloadManager;
+import org.apache.hadoop.hbase.testclassification.CompactionServerTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+@Category({CompactionServerTests.class, MediumTests.class})
+public class TestCompactionServer {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestCompactionServer.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestCompactionServer.class);
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Configuration CONF = TEST_UTIL.getConfiguration();
+  private static HMaster MASTER;
+  private static HCompactionServer COMPACTION_SERVER;
+  private static ServerName COMPACTION_SERVER_NAME;
+  private static TableName TABLENAME = TableName.valueOf("t");
+  private static String FAMILY = "C";
+  private static String COL ="c0";
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1).build());
+    MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
+    TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
+    COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
+      .getCompactionServer();
+    COMPACTION_SERVER_NAME = COMPACTION_SERVER.getServerName();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws Exception {
+    TEST_UTIL.createTable(TABLENAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLENAME);
+  }
+
+  @After
+  public void after() throws IOException {
+    TEST_UTIL.deleteTableIfAny(TABLENAME);
+  }
+
+  @Test
+  public void testCompactionServerExpire() throws Exception {
+    int initialNum = TEST_UTIL.getMiniHBaseCluster().getNumLiveCompactionServers();
+    CONF.setInt(HConstants.COMPACTION_SERVER_PORT, HConstants.DEFAULT_COMPACTION_SERVER_PORT
+ 1);
+    CONF.setInt(HConstants.COMPACTION_SERVER_INFO_PORT,
+      HConstants.DEFAULT_COMPACTION_SERVER_INFOPORT + 1);
+    HCompactionServer compactionServer = new HCompactionServer(CONF);
+    compactionServer.start();
+    ServerName compactionServerName = compactionServer.getServerName();
+
+    CompactionOffloadManager compactionOffloadManager = MASTER.getCompactionOffloadManager();
+    TEST_UTIL.waitFor(60000,
+      () -> initialNum + 1 == compactionOffloadManager.getOnlineServersList().size());
+
+    compactionServer.stop("test");
+
+    TEST_UTIL.waitFor(60000,
+      () -> initialNum == compactionOffloadManager.getOnlineServersList().size());
+  }
+}
diff --git a/pom.xml b/pom.xml
index aeb8bd4..005b573 100755
--- a/pom.xml
+++ b/pom.xml
@@ -3419,6 +3419,22 @@
       </properties>
     </profile>
     <profile>
+      <id>runCompactionServerTests</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <properties>
+        <surefire.firstPartForkCount>1</surefire.firstPartForkCount>
+        <surefire.secondPartForkCount>1</surefire.secondPartForkCount>
+        <surefire.skipFirstPart>false</surefire.skipFirstPart>
+        <surefire.skipSecondPart>true</surefire.skipSecondPart>
+        <surefire.firstPartGroups>
+          org.apache.hadoop.hbase.testclassification.CompactionServerTests
+        </surefire.firstPartGroups>
+        <surefire.secondPartGroups></surefire.secondPartGroups>
+      </properties>
+    </profile>
+    <profile>
       <id>runVerySlowMapReduceTests</id>
       <activation>
         <activeByDefault>false</activeByDefault>

Mime
View raw message