hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [26/34] hbase git commit: HBASE-4224 Need a flush by regionserver rather than by table option
Date Wed, 24 Jan 2018 09:51:08 GMT
HBASE-4224 Need a flush by regionserver rather than by table option


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/541f8ad8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/541f8ad8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/541f8ad8

Branch: refs/heads/HBASE-19064
Commit: 541f8ad8a8515886351cfaa1aacd9cfed1296bf9
Parents: 93a182f
Author: Chia-Ping Tsai <chia7712@gmail.com>
Authored: Sun Dec 17 01:59:19 2017 +0800
Committer: Chia-Ping Tsai <chia7712@gmail.com>
Committed: Tue Jan 23 09:51:05 2018 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Admin.java   |   7 +
 .../apache/hadoop/hbase/client/AsyncAdmin.java  |   6 +
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    |   5 +
 .../apache/hadoop/hbase/client/HBaseAdmin.java  |  35 ++--
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java |  49 ++++--
 .../hbase/client/TestFlushFromClient.java       | 176 +++++++++++++++++++
 6 files changed, 254 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/541f8ad8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index 6729473..b8546fa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -826,6 +826,13 @@ public interface Admin extends Abortable, Closeable {
   void flushRegion(byte[] regionName) throws IOException;
 
   /**
+   * Flush all regions on the region server. Synchronous operation.
+   * @param serverName the region server name to flush
+   * @throws IOException if a remote or network exception occurs
+   */
+  void flushRegionServer(ServerName serverName) throws IOException;
+
+  /**
    * Compact a table. Asynchronous operation in that this method requests that a
    * Compaction run and then it returns. It does not wait on the completion of Compaction
    * (it can take a while).

http://git-wip-us.apache.org/repos/asf/hbase/blob/541f8ad8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index a375265..35cdd3f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -295,6 +295,12 @@ public interface AsyncAdmin {
   CompletableFuture<Void> flushRegion(byte[] regionName);
 
   /**
+   * Flush all region on the region server.
+   * @param serverName server to flush
+   */
+  CompletableFuture<Void> flushRegionServer(ServerName serverName);
+
+  /**
    * Compact a table. When the returned CompletableFuture is done, it only means the compact
request
    * was sent to HBase and may need some time to finish the compact operation.
    * @param tableName table to compact

http://git-wip-us.apache.org/repos/asf/hbase/blob/541f8ad8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index d0d19c1..9b2390c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -243,6 +243,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
+  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
+    return wrap(rawAdmin.flushRegionServer(sn));
+  }
+
+  @Override
   public CompletableFuture<Void> compact(TableName tableName,
       CompactType compactType) {
     return wrap(rawAdmin.compact(tableName, compactType));

http://git-wip-us.apache.org/repos/asf/hbase/blob/541f8ad8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 1a87b48..8685984 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -1188,21 +1188,28 @@ public class HBaseAdmin implements Admin {
     if (regionServerPair.getSecond() == null) {
       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
     }
-    final RegionInfo hRegionInfo = regionServerPair.getFirst();
+    final RegionInfo regionInfo = regionServerPair.getFirst();
     ServerName serverName = regionServerPair.getSecond();
-    final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
-    Callable<Void> callable = new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        // TODO: There is no timeout on this controller. Set one!
-        HBaseRpcController controller = rpcControllerFactory.newController();
-        FlushRegionRequest request =
-            RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
-        admin.flushRegion(controller, request);
-        return null;
-      }
-    };
-    ProtobufUtil.call(callable);
+    flush(this.connection.getAdmin(serverName), regionInfo);
+  }
+
+  private void flush(AdminService.BlockingInterface admin, final RegionInfo info)
+    throws IOException {
+    ProtobufUtil.call(() -> {
+      // TODO: There is no timeout on this controller. Set one!
+      HBaseRpcController controller = rpcControllerFactory.newController();
+      FlushRegionRequest request =
+        RequestConverter.buildFlushRegionRequest(info.getRegionName());
+      admin.flushRegion(controller, request);
+      return null;
+    });
+  }
+
+  @Override
+  public void flushRegionServer(ServerName serverName) throws IOException {
+    for (RegionInfo region : getRegions(serverName)) {
+      flush(this.connection.getAdmin(serverName), region);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/541f8ad8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 5a18afe..050bfe2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -827,25 +827,54 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
               .toStringBinary(regionName)));
           return;
         }
-
-        RegionInfo regionInfo = location.getRegion();
-        this.<Void> newAdminCaller()
-            .serverName(serverName)
-            .action(
-              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
Void> adminCall(
-                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
-                    .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
-                resp -> null)).call().whenComplete((ret, err2) -> {
+        flush(serverName, location.getRegion())
+          .whenComplete((ret, err2) -> {
               if (err2 != null) {
                 future.completeExceptionally(err2);
               } else {
                 future.complete(ret);
               }
-            });
+          });
       });
     return future;
   }
 
+  private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo
regionInfo) {
+    return this.<Void> newAdminCaller()
+            .serverName(serverName)
+            .action(
+              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
Void> adminCall(
+                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
+                  .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
+                resp -> null))
+            .call();
+  }
+
+  @Override
+  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegions(sn).whenComplete((hRegionInfos, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
+      if (hRegionInfos != null) {
+        hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
+      }
+      CompletableFuture
+        .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
+        .whenComplete((ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+    });
+    return future;
+  }
+
   @Override
   public CompletableFuture<Void> compact(TableName tableName, CompactType compactType)
{
     return compact(tableName, null, false, compactType);

http://git-wip-us.apache.org/repos/asf/hbase/blob/541f8ad8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
new file mode 100644
index 0000000..9085fa5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java
@@ -0,0 +1,176 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({MediumTests.class, ClientTests.class})
+public class TestFlushFromClient {
+  private static final Log LOG = LogFactory.getLog(TestFlushFromClient.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static AsyncConnection asyncConn;
+  private static final byte[][] SPLITS = new byte[][]{Bytes.toBytes("3"), Bytes.toBytes("7")};
+  private static final List<byte[]> ROWS = Arrays.asList(
+    Bytes.toBytes("1"),
+    Bytes.toBytes("4"),
+    Bytes.toBytes("8"));
+  private static final byte[] FAMILY = Bytes.toBytes("f1");
+
+  @Rule
+  public TestName name = new TestName();
+
+  public TableName tableName;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(ROWS.size());
+    asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    IOUtils.cleanup(null, asyncConn);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    tableName = TableName.valueOf(name.getMethodName());
+    try (Table t = TEST_UTIL.createTable(tableName, FAMILY, SPLITS)) {
+      List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList());
+      for (int i = 0; i != 20; ++i) {
+        byte[] value = Bytes.toBytes(i);
+        puts.forEach(p -> p.addColumn(FAMILY, value, value));
+      }
+      t.put(puts);
+    }
+    assertFalse(getRegionInfo().isEmpty());
+    assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreSize() != 0));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
+      LOG.info("Tear down, remove table=" + htd.getTableName());
+      TEST_UTIL.deleteTable(htd.getTableName());
+    }
+  }
+
+  @Test
+  public void testFlushTable() throws Exception {
+    try (Admin admin = TEST_UTIL.getAdmin()) {
+      admin.flush(tableName);
+      assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreSize() != 0));
+    }
+  }
+
+  @Test
+  public void testAsyncFlushTable() throws Exception {
+    AsyncAdmin admin = asyncConn.getAdmin();
+    admin.flush(tableName).get();
+    assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreSize() != 0));
+  }
+
+  @Test
+  public void testFlushRegion() throws Exception {
+    try (Admin admin = TEST_UTIL.getAdmin()) {
+      for (HRegion r : getRegionInfo()) {
+        admin.flushRegion(r.getRegionInfo().getRegionName());
+        TimeUnit.SECONDS.sleep(1);
+        assertEquals(0, r.getMemStoreSize());
+      }
+    }
+  }
+
+  @Test
+  public void testAsyncFlushRegion() throws Exception {
+    AsyncAdmin admin = asyncConn.getAdmin();
+    for (HRegion r : getRegionInfo()) {
+      admin.flushRegion(r.getRegionInfo().getRegionName()).get();
+      TimeUnit.SECONDS.sleep(1);
+      assertEquals(0, r.getMemStoreSize());
+    }
+  }
+
+  @Test
+  public void testFlushRegionServer() throws Exception {
+    try (Admin admin = TEST_UTIL.getAdmin()) {
+      for (HRegionServer rs : TEST_UTIL.getHBaseCluster()
+            .getLiveRegionServerThreads()
+            .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
+            .collect(Collectors.toList())) {
+        admin.flushRegionServer(rs.getServerName());
+        assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreSize() != 0));
+      }
+    }
+  }
+
+  @Test
+  public void testAsyncFlushRegionServer() throws Exception {
+    AsyncAdmin admin = asyncConn.getAdmin();
+    for (HRegionServer rs : TEST_UTIL.getHBaseCluster()
+      .getLiveRegionServerThreads()
+      .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
+      .collect(Collectors.toList())) {
+      admin.flushRegionServer(rs.getServerName()).get();
+      assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreSize() != 0));
+    }
+  }
+
+  private List<HRegion> getRegionInfo() {
+    return TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
+      .map(JVMClusterUtil.RegionServerThread::getRegionServer)
+      .flatMap(r -> r.getRegions().stream())
+      .filter(r -> r.getTableDescriptor().getTableName().equals(tableName))
+      .collect(Collectors.toList());
+  }
+
+  private List<HRegion> getRegionInfo(HRegionServer rs) {
+    return rs.getRegions().stream()
+      .filter(v -> v.getTableDescriptor().getTableName().equals(tableName))
+      .collect(Collectors.toList());
+  }
+}


Mime
View raw message