hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-15537 Make multi WAL work with WALs other than FSHLog
Date Fri, 08 Apr 2016 02:46:05 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 d94c18a0e -> 2b7556270


HBASE-15537 Make multi WAL work with WALs other than FSHLog


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2b755627
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2b755627
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2b755627

Branch: refs/heads/branch-1
Commit: 2b75562701a2c187f30cb50219817984a36bf013
Parents: d94c18a
Author: zhangduo <zhangduo@apache.org>
Authored: Fri Apr 8 10:42:40 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Fri Apr 8 10:42:40 2016 +0800

----------------------------------------------------------------------
 .../hbase/wal/RegionGroupingProvider.java       | 134 ++++++++-----------
 .../org/apache/hadoop/hbase/wal/WALFactory.java |  30 +++--
 .../TestReplicationEndpointWithMultipleWAL.java |   2 +
 ...onKillMasterRSCompressedWithMultipleWAL.java |   2 +
 ...estReplicationSyncUpToolWithMultipleWAL.java |   2 +
 .../wal/TestBoundedRegionGroupingStrategy.java  |   1 -
 6 files changed, 83 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2b755627/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index 0aeaccf..a725989 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -23,29 +23,26 @@ import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIM
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
-import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 // imports for classes still in regionserver.wal
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.IdReadWriteLock;
 
 /**
  * A WAL Provider that returns a WAL per group of regions.
  *
+ * This provider follows the decorator pattern and mainly holds the logic for WAL grouping.
+ * WAL creation/roll/close is delegated to {@link #DELEGATE_PROVIDER}
+ *
  * Region grouping is handled via {@link RegionGroupingStrategy} and can be configured via
the
  * property "hbase.wal.regiongrouping.strategy". Current strategy choices are
  * <ul>
@@ -57,7 +54,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
  * Optionally, a FQCN to a custom implementation may be given.
  */
 @InterfaceAudience.Private
-class RegionGroupingProvider implements WALProvider {
+public class RegionGroupingProvider implements WALProvider {
   private static final Log LOG = LogFactory.getLog(RegionGroupingProvider.class);
 
   /**
@@ -124,22 +121,23 @@ class RegionGroupingProvider implements WALProvider {
   public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
   public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
 
+  /** delegate provider for WAL creation/roll/close */
+  public static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate.provider";
+  public static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider
+      .name();
+
   private static final String META_WAL_GROUP_NAME = "meta";
 
-  /** A group-wal mapping, recommended to make sure one-one rather than many-one mapping
*/
-  protected final Map<String, FSHLog> cached = new HashMap<String, FSHLog>();
-  /** Stores unique wals generated by this RegionGroupingProvider */
-  private final Set<FSHLog> logs = Collections.synchronizedSet(new HashSet<FSHLog>());
+  /** A group-provider mapping, make sure one-one rather than many-one mapping */
+  private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
 
-  /**
-   * we synchronize on walCacheLock to prevent wal recreation in different threads
-   */
-  final Object walCacheLock = new Object();
+  private final IdReadWriteLock createLock = new IdReadWriteLock();
 
-  protected RegionGroupingStrategy strategy = null;
+  private RegionGroupingStrategy strategy = null;
+  private WALFactory factory = null;
   private List<WALActionsListener> listeners = null;
   private String providerId = null;
-  private Configuration conf = null;
+  private Class<? extends WALProvider> providerClass;
 
   @Override
   public void init(final WALFactory factory, final Configuration conf,
@@ -147,6 +145,7 @@ class RegionGroupingProvider implements WALProvider {
     if (null != strategy) {
       throw new IllegalStateException("WALProvider.init should only be called once.");
     }
+    this.factory = factory;
     this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners);
     StringBuilder sb = new StringBuilder().append(factory.factoryId);
     if (providerId != null) {
@@ -158,45 +157,33 @@ class RegionGroupingProvider implements WALProvider {
     }
     this.providerId = sb.toString();
     this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
-    this.conf = conf;
+    this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER);
   }
 
-  /**
-   * Populate the cache for this group.
-   */
-  FSHLog populateCache(String groupName) throws IOException {
-    boolean isMeta = META_WAL_PROVIDER_ID.equals(providerId);
-    String hlogPrefix;
-    List<WALActionsListener> listeners;
-    if (isMeta) {
-      hlogPrefix = this.providerId;
-      // don't watch log roll for meta
-      listeners = Collections.<WALActionsListener> singletonList(new MetricsWAL());
+  private WALProvider createProvider(String group) throws IOException {
+    if (META_WAL_PROVIDER_ID.equals(providerId)) {
+      return factory.createProvider(providerClass, listeners, META_WAL_PROVIDER_ID);
     } else {
-      hlogPrefix = groupName;
-      listeners = this.listeners;
+      return factory.createProvider(providerClass, listeners, group);
     }
-    FSHLog log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
-        DefaultWALProvider.getWALDirectoryName(providerId), HConstants.HREGION_OLDLOGDIR_NAME,
-        conf, listeners, true, hlogPrefix, isMeta ? META_WAL_PROVIDER_ID : null);
-    cached.put(groupName, log);
-    logs.add(log);
-    return log;
   }
 
   private WAL getWAL(final String group) throws IOException {
-    WAL log = cached.get(group);
-    if (null == log) {
-      // only lock when need to create wal, and need to lock since
-      // creating hlog on fs is time consuming
-      synchronized (this.walCacheLock) {
-        log = cached.get(group);// check again
-        if (null == log) {
-          log = populateCache(group);
+    WALProvider provider = cached.get(group);
+    if (provider == null) {
+      Lock lock = createLock.getLock(group.hashCode()).writeLock();
+      lock.lock();
+      try {
+        provider = cached.get(group);
+        if (provider == null) {
+          provider = createProvider(group);
+          cached.put(group, provider);
         }
+      } finally {
+        lock.unlock();
       }
     }
-    return log;
+    return provider.getWAL(null, null);
   }
 
   @Override
@@ -214,15 +201,15 @@ class RegionGroupingProvider implements WALProvider {
   public void shutdown() throws IOException {
     // save the last exception and rethrow
     IOException failure = null;
-    synchronized (logs) {
-      for (FSHLog wal : logs) {
-        try {
-          wal.shutdown();
-        } catch (IOException exception) {
-          LOG.error("Problem shutting down log '" + wal + "': " + exception.getMessage());
-          LOG.debug("Details of problem shutting down log '" + wal + "'", exception);
-          failure = exception;
+    for (WALProvider provider: cached.values()) {
+      try {
+        provider.shutdown();
+      } catch (IOException e) {
+        LOG.error("Problem shutting down wal provider '" + provider + "': " + e.getMessage());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Details of problem shutting down wal provider '" + provider + "'", e);
         }
+        failure = e;
       }
     }
     if (failure != null) {
@@ -234,15 +221,15 @@ class RegionGroupingProvider implements WALProvider {
   public void close() throws IOException {
     // save the last exception and rethrow
     IOException failure = null;
-    synchronized (logs) {
-      for (FSHLog wal : logs) {
-        try {
-          wal.close();
-        } catch (IOException exception) {
-          LOG.error("Problem closing log '" + wal + "': " + exception.getMessage());
-          LOG.debug("Details of problem closing wal '" + wal + "'", exception);
-          failure = exception;
+    for (WALProvider provider : cached.values()) {
+      try {
+        provider.close();
+      } catch (IOException e) {
+        LOG.error("Problem closing wal provider '" + provider + "': " + e.getMessage());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Details of problem closing wal provider '" + provider + "'", e);
         }
+        failure = e;
       }
     }
     if (failure != null) {
@@ -262,10 +249,8 @@ class RegionGroupingProvider implements WALProvider {
   @Override
   public long getNumLogFiles() {
     long numLogFiles = 0;
-    synchronized (logs) {
-      for (FSHLog wal : logs) {
-        numLogFiles += wal.getNumLogFiles();
-      }
+    for (WALProvider provider : cached.values()) {
+      numLogFiles += provider.getNumLogFiles();
     }
     return numLogFiles;
   }
@@ -273,12 +258,9 @@ class RegionGroupingProvider implements WALProvider {
   @Override
   public long getLogFileSize() {
     long logFileSize = 0;
-    synchronized (logs) {
-      for (FSHLog wal : logs) {
-        logFileSize += wal.getLogFileSize();
-      }
+    for (WALProvider provider : cached.values()) {
+      logFileSize += provider.getLogFileSize();
     }
     return logFileSize;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2b755627/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 7dd3e7d..8ef3cab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -127,38 +127,46 @@ public class WALFactory {
     factoryId = SINGLETON_ID;
   }
 
-  /**
-   * instantiate a provider from a config property.
-   * requires conf to have already been set (as well as anything the provider might need
to read).
-   */
-  WALProvider getProvider(final String key, final String defaultValue,
-      final List<WALActionsListener> listeners, final String providerId) throws IOException
{
-    Class<? extends WALProvider> clazz;
+  Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
     try {
-      clazz = Providers.valueOf(conf.get(key, defaultValue)).clazz;
+      return Providers.valueOf(conf.get(key, defaultValue)).clazz;
     } catch (IllegalArgumentException exception) {
       // Fall back to them specifying a class name
       // Note that the passed default class shouldn't actually be used, since the above only
fails
       // when there is a config value present.
-      clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
+      return conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
     }
+  }
+
+  WALProvider createProvider(Class<? extends WALProvider> clazz,
+      List<WALActionsListener> listeners, String providerId) throws IOException {
     LOG.info("Instantiating WALProvider of type " + clazz);
     try {
       final WALProvider result = clazz.newInstance();
       result.init(this, conf, listeners, providerId);
       return result;
     } catch (InstantiationException exception) {
-      LOG.error("couldn't set up WALProvider, check config key " + key);
+      LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
       LOG.debug("Exception details for failure to load WALProvider.", exception);
       throw new IOException("couldn't set up WALProvider", exception);
     } catch (IllegalAccessException exception) {
-      LOG.error("couldn't set up WALProvider, check config key " + key);
+      LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
       LOG.debug("Exception details for failure to load WALProvider.", exception);
       throw new IOException("couldn't set up WALProvider", exception);
     }
   }
 
   /**
+   * instantiate a provider from a config property.
+   * requires conf to have already been set (as well as anything the provider might need
to read).
+   */
+  WALProvider getProvider(final String key, final String defaultValue,
+      final List<WALActionsListener> listeners, final String providerId) throws IOException
{
+    Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue);
+    return createProvider(clazz, listeners, providerId);
+  }
+
+  /**
    * @param conf must not be null, will keep a reference to read params in later reader/writer
    *     instances.
    * @param listeners may be null. will be given to all created wals (and not meta-wals)

http://git-wip-us.apache.org/repos/asf/hbase/blob/2b755627/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
index c4da4a3..34a0117 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.multiwal;
 
 import org.apache.hadoop.hbase.replication.TestReplicationEndpoint;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
@@ -28,6 +29,7 @@ public class TestReplicationEndpointWithMultipleWAL extends TestReplicationEndpo
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
+    conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
     TestReplicationEndpoint.setUpBeforeClass();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2b755627/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
index bbe17b2..8b01df3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.multiwal;
 
 import org.apache.hadoop.hbase.replication.TestReplicationKillMasterRSCompressed;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
@@ -29,6 +30,7 @@ public class TestReplicationKillMasterRSCompressedWithMultipleWAL extends
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
+    conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
     TestReplicationKillMasterRSCompressed.setUpBeforeClass();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2b755627/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
index 64b7919..2c471f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.multiwal;
 import org.apache.hadoop.hbase.replication.TestReplicationBase;
 import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
@@ -29,6 +30,7 @@ public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyn
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
+    conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
     TestReplicationBase.setUpBeforeClass();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2b755627/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
index 0991807..a455cdb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;


Mime
View raw message