hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject hbase git commit: HBASE-17667: Implement async flush/compact region methods
Date Fri, 05 May 2017 13:13:31 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 01af27061 -> 2026540ea


HBASE-17667: Implement async flush/compact region methods

Signed-off-by: Guanghao Zhang <zghao@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2026540e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2026540e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2026540e

Branch: refs/heads/master
Commit: 2026540ea347e9359e6ac8cf8b3701cd3872a515
Parents: 01af270
Author: huzheng <openinx@gmail.com>
Authored: Thu May 4 17:58:55 2017 +0800
Committer: Guanghao Zhang <zghao@apache.org>
Committed: Fri May 5 21:12:49 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/AsyncAdmin.java  |  85 ++++++
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    | 278 ++++++++++++++++++-
 .../hbase/shaded/protobuf/ProtobufUtil.java     |   2 +-
 .../hbase/client/TestAsyncRegionAdminApi.java   | 227 +++++++++++++++
 4 files changed, 590 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2026540e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index b764726..3d63705 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -406,6 +406,91 @@ public interface AsyncAdmin {
   CompletableFuture<Void> closeRegion(ServerName sn, HRegionInfo hri);
 
   /**
+   * Get all the online regions on a region server.
+   */
+  CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn);
+
+  /**
+   * Flush a table.
+   * @param tableName table to flush
+   */
+  CompletableFuture<Void> flush(TableName tableName);
+
+  /**
+   * Flush an individual region.
+   * @param regionName region to flush
+   */
+  CompletableFuture<Void> flushRegion(byte[] regionName);
+
+  /**
+   * Compact a table. Asynchronous operation even if CompletableFuture.get().
+   * @param tableName table to compact
+   */
+  CompletableFuture<Void> compact(TableName tableName);
+
+  /**
+   * Compact a column family within a table. Asynchronous operation even if CompletableFuture.get().
+   * @param tableName table to compact
+   * @param columnFamily column family within a table
+   */
+  CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily);
+
+  /**
+   * Compact an individual region. Asynchronous operation even if CompletableFuture.get().
+   * @param regionName region to compact
+   */
+  CompletableFuture<Void> compactRegion(byte[] regionName);
+
+  /**
+   * Compact a column family within a region. Asynchronous operation even if
+   * CompletableFuture.get().
+   * @param regionName region to compact
+   * @param columnFamily column family within a region
+   */
+  CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily);
+
+  /**
+   * Major compact a table. Asynchronous operation even if CompletableFuture.get().
+   * @param tableName table to major compact
+   */
+  CompletableFuture<Void> majorCompact(TableName tableName);
+
+  /**
+   * Major compact a column family within a table. Asynchronous operation even if
+   * CompletableFuture.get().
+   * @param tableName table to major compact
+   * @param columnFamily column family within a table
+   */
+  CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily);
+
+  /**
+   * Major compact a table or an individual region. Asynchronous operation even if
+   * CompletableFuture.get().
+   * @param regionName region to major compact
+   */
+  CompletableFuture<Void> majorCompactRegion(byte[] regionName);
+
+  /**
+   * Major compact a column family within region. Asynchronous operation even if
+   * CompletableFuture.get().
+   * @param regionName egion to major compact
+   * @param columnFamily column family within a region
+   */
+  CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily);
+
+  /**
+   * Compact all regions on the region server.
+   * @param sn the region server name
+   */
+  CompletableFuture<Void> compactRegionServer(ServerName sn);
+
+  /**
+   * Compact all regions on the region server.
+   * @param sn the region server name
+   */
+  CompletableFuture<Void> majorCompactRegionServer(ServerName sn);
+
+  /**
    * Merge two regions.
    * @param nameOfRegionA encoded or full name of region a
    * @param nameOfRegionB encoded or full name of region b

http://git-wip-us.apache.org/repos/asf/hbase/blob/2026540e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 019d0c6..baad871 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -40,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
 import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotEnabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -80,10 +82,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
@@ -185,6 +192,7 @@ import org.apache.hadoop.hbase.util.Pair;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class AsyncHBaseAdmin implements AsyncAdmin {
+  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
 
   private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
 
@@ -853,6 +861,274 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
         .serverName(sn).call();
   }
 
+  @Override
+  public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn)
{
+    return this.<List<HRegionInfo>> newAdminCaller()
+        .action((controller, stub) -> this
+            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>>
adminCall(
+              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
+              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
+              resp -> ProtobufUtil.getRegionInfos(resp)))
+        .serverName(sn).call();
+  }
+
+  @Override
+  public CompletableFuture<Void> flush(TableName tableName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    tableExists(tableName).whenComplete((exists, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+      } else if (!exists) {
+        future.completeExceptionally(new TableNotFoundException(tableName));
+      } else {
+        isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else if (!tableEnabled) {
+            future.completeExceptionally(new TableNotEnabledException(tableName));
+          } else {
+            execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
+              new HashMap<>()).whenComplete((ret, err3) -> {
+                if (err3 != null) {
+                  future.completeExceptionally(err3);
+                } else {
+                  future.complete(ret);
+                }
+              });
+          }
+        });
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> flushRegion(byte[] regionName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegion(regionName).whenComplete((p, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      if (p == null || p.getFirst() == null) {
+        future.completeExceptionally(
+          new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)));
+        return;
+      }
+      if (p.getSecond() == null) {
+        future.completeExceptionally(
+          new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+
+      this.<Void> newAdminCaller().serverName(p.getSecond())
+          .action((controller, stub) -> this
+              .<FlushRegionRequest, FlushRegionResponse, Void> adminCall(controller,
stub,
+                RequestConverter.buildFlushRegionRequest(p.getFirst().getRegionName()),
+                (s, c, req, done) -> s.flushRegion(c, req, done), resp -> null))
+          .call().whenComplete((ret, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              future.complete(ret);
+            }
+          });
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> compact(TableName tableName) {
+    return compact(tableName, null, false, CompactType.NORMAL);
+  }
+
+  @Override
+  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily)
{
+    return compact(tableName, columnFamily, false, CompactType.NORMAL);
+  }
+
+  @Override
+  public CompletableFuture<Void> compactRegion(byte[] regionName) {
+    return compactRegion(regionName, null, false);
+  }
+
+  @Override
+  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily)
{
+    return compactRegion(regionName, columnFamily, false);
+  }
+
+  @Override
+  public CompletableFuture<Void> majorCompact(TableName tableName) {
+    return compact(tableName, null, true, CompactType.NORMAL);
+  }
+
+  @Override
+  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily)
{
+    return compact(tableName, columnFamily, true, CompactType.NORMAL);
+  }
+
+  @Override
+  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
+    return compactRegion(regionName, null, true);
+  }
+
+  @Override
+  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily)
{
+    return compactRegion(regionName, columnFamily, true);
+  }
+
+  @Override
+  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
+    return compactRegionServer(sn, false);
+  }
+
+  @Override
+  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
+    return compactRegionServer(sn, true);
+  }
+
+  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major)
{
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getOnlineRegions(sn).whenComplete((hRegionInfos, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
+      if (hRegionInfos != null) {
+        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
+      }
+      CompletableFuture
+          .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
+          .whenComplete((ret, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              future.complete(ret);
+            }
+          });
+    });
+    return future;
+  }
+
+  private CompletableFuture<Void> compactRegion(final byte[] regionName, final byte[]
columnFamily,
+      final boolean major) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegion(regionName).whenComplete((p, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      if (p == null || p.getFirst() == null) {
+        future.completeExceptionally(
+          new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)));
+        return;
+      }
+      if (p.getSecond() == null) {
+        // found a region without region server assigned.
+        future.completeExceptionally(
+          new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      compact(p.getSecond(), p.getFirst(), major, columnFamily).whenComplete((ret, err2)
-> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
+        } else {
+          future.complete(ret);
+        }
+      });
+    });
+    return future;
+  }
+
+  /**
+   * List all region locations for the specific table.
+   */
+  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName
tableName) {
+    CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
+    if (TableName.META_TABLE_NAME.equals(tableName)) {
+      // For meta table, we use zk to fetch all locations.
+      AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
+      registry.getMetaRegionLocation().whenComplete((metaRegions, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+        } else if (metaRegions == null || metaRegions.isEmpty()
+            || metaRegions.getDefaultRegionLocation() == null) {
+          future.completeExceptionally(new IOException("meta region does not found"));
+        } else {
+          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
+        }
+        // close the registry.
+        IOUtils.closeQuietly(registry);
+      });
+    } else {
+      // For non-meta table, we fetch all locations by scanning hbase:meta table
+      AsyncMetaTableAccessor.getTableRegionsAndLocations(metaTable, Optional.of(tableName))
+          .whenComplete((locations, err) -> {
+            if (err != null) {
+              future.completeExceptionally(err);
+            } else if (locations == null || locations.isEmpty()) {
+              future.complete(Collections.emptyList());
+            } else {
+              List<HRegionLocation> regionLocations = locations.stream()
+                  .map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
+                  .collect(Collectors.toList());
+              future.complete(regionLocations);
+            }
+          });
+    }
+    return future;
+  }
+
+  /**
+   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
+   */
+  private CompletableFuture<Void> compact(final TableName tableName, final byte[] columnFamily,
+      final boolean major, CompactType compactType) {
+    if (CompactType.MOB.equals(compactType)) {
+      // TODO support MOB compact.
+      return failedFuture(new UnsupportedOperationException("MOB compact does not support"));
+    }
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
+      for (HRegionLocation location : locations) {
+        if (location.getRegionInfo() == null || location.getRegionInfo().isOffline()) continue;
+        if (location.getServerName() == null) continue;
+        compactFutures
+            .add(compact(location.getServerName(), location.getRegionInfo(), major, columnFamily));
+      }
+      // future complete unless all of the compact futures are completed.
+      CompletableFuture
+          .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
+          .whenComplete((ret, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              future.complete(ret);
+            }
+          });
+    });
+    return future;
+  }
+
+  /**
+   * Compact the region at specific region server.
+   */
+  private CompletableFuture<Void> compact(final ServerName sn, final HRegionInfo hri,
+      final boolean major, final byte[] family) {
+    return this.<Void> newAdminCaller().serverName(sn)
+        .action((controller, stub) -> this
+            .<CompactRegionRequest, CompactRegionResponse, Void> adminCall(controller,
stub,
+              RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family),
+              (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
+        .call();
+  }
+
   private byte[] toEncodeRegionName(byte[] regionName) {
     try {
       return HRegionInfo.isEncodedRegionName(regionName) ? regionName

http://git-wip-us.apache.org/repos/asf/hbase/blob/2026540e/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index ff576f7..2e62deb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -1943,7 +1943,7 @@ public final class ProtobufUtil {
    * @param proto the GetOnlineRegionResponse
    * @return the list of region info or null if <code>proto</code> is null
    */
-  static List<HRegionInfo> getRegionInfos(final GetOnlineRegionResponse proto) {
+  public static List<HRegionInfo> getRegionInfos(final GetOnlineRegionResponse proto)
{
     if (proto == null) return null;
     List<HRegionInfo> regionInfos = new ArrayList<>(proto.getRegionInfoList().size());
     for (RegionInfo regionInfo: proto.getRegionInfoList()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/2026540e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
index 038d6d4..04bd224 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
@@ -25,6 +25,9 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -40,12 +43,15 @@ import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.ServerManager;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -55,6 +61,8 @@ import org.junit.experimental.categories.Category;
 @Category({ MediumTests.class, ClientTests.class })
 public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
 
+  public static Random RANDOM = new Random(System.currentTimeMillis());
+
   private void createTableWithDefaultConf(TableName TABLENAME) throws Exception {
     HTableDescriptor htd = new HTableDescriptor(TABLENAME);
     HColumnDescriptor hcd = new HColumnDescriptor("value");
@@ -440,4 +448,223 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
       TEST_UTIL.deleteTable(tableName);
     }
   }
+
+  @Test
+  public void testGetOnlineRegions() throws Exception {
+    final TableName tableName = TableName.valueOf("testGetOnlineRegions");
+    try {
+      createTableAndGetOneRegion(tableName);
+      AtomicInteger regionServerCount = new AtomicInteger(0);
+      TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
+          .map(rsThread -> rsThread.getRegionServer().getServerName()).forEach(serverName
-> {
+            try {
+              Assert.assertEquals(admin.getOnlineRegions(serverName).get().size(),
+                TEST_UTIL.getAdmin().getOnlineRegions(serverName).size());
+            } catch (Exception e) {
+              fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
+            }
+            regionServerCount.incrementAndGet();
+          });
+      Assert.assertEquals(regionServerCount.get(), 2);
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testFlushTableAndRegion() throws Exception {
+    final TableName tableName = TableName.valueOf("testFlushRegion");
+    try {
+      HRegionInfo hri = createTableAndGetOneRegion(tableName);
+      ServerName serverName = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
+          .getRegionStates().getRegionServerOfRegion(hri);
+      HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
+          .map(rsThread -> rsThread.getRegionServer())
+          .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
+      // write a put into the specific region
+      try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+        table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")));
+      }
+      Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize()
> 0);
+      // flush region and wait flush operation finished.
+      LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
+      admin.flushRegion(hri.getRegionName()).get();
+      LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
+      Threads.sleepWithoutInterrupt(500);
+      while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0)
{
+        Threads.sleep(50);
+      }
+      // check the memstore.
+      Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(),
0);
+
+      // write another put into the specific region
+      try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+        table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")));
+      }
+      Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize()
> 0);
+      admin.flush(tableName).get();
+      Threads.sleepWithoutInterrupt(500);
+      while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0)
{
+        Threads.sleep(50);
+      }
+      // check the memstore.
+      Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(),
0);
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  @Test(timeout = 600000)
+  public void testCompactRpcAPI() throws Exception {
+    String tableName = "testCompactRpcAPI";
+    compactionTest(tableName, 8, CompactionState.MAJOR, false);
+    compactionTest(tableName, 15, CompactionState.MINOR, false);
+    compactionTest(tableName, 8, CompactionState.MAJOR, true);
+    compactionTest(tableName, 15, CompactionState.MINOR, true);
+  }
+
+  @Test(timeout = 600000)
+  public void testCompactRegionServer() throws Exception {
+    TableName table = TableName.valueOf("testCompactRegionServer");
+    byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
+    Table ht = null;
+    try {
+      ht = TEST_UTIL.createTable(table, families);
+      loadData(ht, families, 3000, 8);
+      List<HRegionServer> rsList = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
+          .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
+      List<Region> regions = new ArrayList<>();
+      rsList.forEach(rs -> regions.addAll(rs.getOnlineRegions(table)));
+      Assert.assertEquals(regions.size(), 1);
+      int countBefore = countStoreFilesInFamilies(regions, families);
+      Assert.assertTrue(countBefore > 0);
+      // Minor compaction for all region servers.
+      for (HRegionServer rs : rsList)
+        admin.compactRegionServer(rs.getServerName()).get();
+      Thread.sleep(5000);
+      int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
+      Assert.assertTrue(countAfterMinorCompaction < countBefore);
+      // Major compaction for all region servers.
+      for (HRegionServer rs : rsList)
+        admin.majorCompactRegionServer(rs.getServerName()).get();
+      Thread.sleep(5000);
+      int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
+      Assert.assertEquals(countAfterMajorCompaction, 3);
+    } finally {
+      if (ht != null) {
+        TEST_UTIL.deleteTable(table);
+      }
+    }
+  }
+
+  private void compactionTest(final String tableName, final int flushes,
+      final CompactionState expectedState, boolean singleFamily) throws Exception {
+    // Create a table with regions
+    final TableName table = TableName.valueOf(tableName);
+    byte[] family = Bytes.toBytes("family");
+    byte[][] families =
+        { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3"))
};
+    Table ht = null;
+    try {
+      ht = TEST_UTIL.createTable(table, families);
+      loadData(ht, families, 3000, flushes);
+      List<Region> regions = new ArrayList<>();
+      TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads()
+          .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getOnlineRegions(table)));
+      Assert.assertEquals(regions.size(), 1);
+      int countBefore = countStoreFilesInFamilies(regions, families);
+      int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
+      assertTrue(countBefore > 0); // there should be some data files
+      if (expectedState == CompactionState.MINOR) {
+        if (singleFamily) {
+          admin.compact(table, family).get();
+        } else {
+          admin.compact(table).get();
+        }
+      } else {
+        if (singleFamily) {
+          admin.majorCompact(table, family).get();
+        } else {
+          admin.majorCompact(table).get();
+        }
+      }
+      long curt = System.currentTimeMillis();
+      long waitTime = 5000;
+      long endt = curt + waitTime;
+      CompactionState state = TEST_UTIL.getAdmin().getCompactionState(table);
+      while (state == CompactionState.NONE && curt < endt) {
+        Thread.sleep(10);
+        state = TEST_UTIL.getAdmin().getCompactionState(table);
+        curt = System.currentTimeMillis();
+      }
+      // Now, should have the right compaction state,
+      // otherwise, the compaction should have already been done
+      if (expectedState != state) {
+        for (Region region : regions) {
+          state = CompactionState.valueOf(region.getCompactionState().toString());
+          assertEquals(CompactionState.NONE, state);
+        }
+      } else {
+        // Wait until the compaction is done
+        state = TEST_UTIL.getAdmin().getCompactionState(table);
+        while (state != CompactionState.NONE && curt < endt) {
+          Thread.sleep(10);
+          state = TEST_UTIL.getAdmin().getCompactionState(table);
+        }
+        // Now, compaction should be done.
+        assertEquals(CompactionState.NONE, state);
+      }
+      int countAfter = countStoreFilesInFamilies(regions, families);
+      int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
+      assertTrue(countAfter < countBefore);
+      if (!singleFamily) {
+        if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
+        else assertTrue(families.length < countAfter);
+      } else {
+        int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
+        // assert only change was to single column family
+        assertTrue(singleFamDiff == (countBefore - countAfter));
+        if (expectedState == CompactionState.MAJOR) {
+          assertTrue(1 == countAfterSingleFamily);
+        } else {
+          assertTrue(1 < countAfterSingleFamily);
+        }
+      }
+    } finally {
+      if (ht != null) {
+        TEST_UTIL.deleteTable(table);
+      }
+    }
+  }
+
+  private static int countStoreFilesInFamily(List<Region> regions, final byte[] family)
{
+    return countStoreFilesInFamilies(regions, new byte[][] { family });
+  }
+
+  private static int countStoreFilesInFamilies(List<Region> regions, final byte[][]
families) {
+    int count = 0;
+    for (Region region : regions) {
+      count += region.getStoreFileList(families).size();
+    }
+    return count;
+  }
+
+  private static void loadData(final Table ht, final byte[][] families, final int rows,
+      final int flushes) throws IOException {
+    List<Put> puts = new ArrayList<>(rows);
+    byte[] qualifier = Bytes.toBytes("val");
+    for (int i = 0; i < flushes; i++) {
+      for (int k = 0; k < rows; k++) {
+        byte[] row = Bytes.toBytes(RANDOM.nextLong());
+        Put p = new Put(row);
+        for (int j = 0; j < families.length; ++j) {
+          p.addColumn(families[j], qualifier, row);
+        }
+        puts.add(p);
+      }
+      ht.put(puts);
+      TEST_UTIL.flush();
+      puts.clear();
+    }
+  }
 }


Mime
View raw message