hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-14306 Refine RegionGroupingProvider: fix issues and make it more scalable (Yu Li)
Date Mon, 14 Sep 2015 14:27:52 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 8354ed69e -> 576792789


HBASE-14306 Refine RegionGroupingProvider: fix issues and make it more scalable (Yu Li)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/57679278
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/57679278
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/57679278

Branch: refs/heads/branch-1
Commit: 57679278982fc8e6f7cb2d1c31c9aaf636bd57da
Parents: 8354ed6
Author: tedyu <yuzhihong@gmail.com>
Authored: Mon Sep 14 07:27:45 2015 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Mon Sep 14 07:27:45 2015 -0700

----------------------------------------------------------------------
 .../MetricsRegionServerWrapperImpl.java         |  13 +-
 .../hbase/wal/BoundedGroupingStrategy.java      |  67 +++++++
 .../wal/BoundedRegionGroupingProvider.java      | 159 ----------------
 .../hadoop/hbase/wal/DefaultWALProvider.java    |  28 +--
 .../hadoop/hbase/wal/DisabledWALProvider.java   |  10 +
 .../hbase/wal/RegionGroupingProvider.java       |  92 ++++++---
 .../org/apache/hadoop/hbase/wal/WALFactory.java |  10 +-
 .../apache/hadoop/hbase/wal/WALProvider.java    |  10 +
 .../apache/hadoop/hbase/wal/IOTestProvider.java |  10 +
 .../wal/TestBoundedRegionGroupingProvider.java  | 186 ------------------
 .../wal/TestBoundedRegionGroupingStrategy.java  | 189 +++++++++++++++++++
 11 files changed, 372 insertions(+), 402 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/57679278/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 0a18fef..efad74c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -34,8 +34,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.CacheStats;
-import org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.metrics2.MetricsExecutor;
@@ -534,10 +533,12 @@ class MetricsRegionServerWrapperImpl
       }
       lastRan = currentTime;
 
-      numWALFiles = DefaultWALProvider.getNumLogFiles(regionServer.walFactory) +
-          BoundedRegionGroupingProvider.getNumLogFiles(regionServer.walFactory);
-      walFileSize = DefaultWALProvider.getLogFileSize(regionServer.walFactory) +
-          BoundedRegionGroupingProvider.getLogFileSize(regionServer.walFactory);
+      WALProvider provider = regionServer.walFactory.getWALProvider();
+      WALProvider metaProvider = regionServer.walFactory.getMetaWALProvider();
+      numWALFiles = (provider == null ? 0 : provider.getNumLogFiles()) +
+          (metaProvider == null ? 0 : metaProvider.getNumLogFiles());
+      walFileSize = (provider == null ? 0 : provider.getLogFileSize()) +
+          (provider == null ? 0 : provider.getLogFileSize());
       //Copy over computed values so that no thread sees half computed values.
       numStores = tempNumStores;
       numStoreFiles = tempNumStoreFiles;

http://git-wip-us.apache.org/repos/asf/hbase/blob/57679278/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java
new file mode 100644
index 0000000..14c5594
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy;
+
+/**
+ * A WAL grouping strategy that limits the number of delegate providers (i.e. wal group)
to
+ * "hbase.wal.regiongrouping.numgroups".
+ */
+@InterfaceAudience.Private
+public class BoundedGroupingStrategy implements RegionGroupingStrategy{
+
+  static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups";
+  static final int DEFAULT_NUM_REGION_GROUPS = 2;
+
+  private ConcurrentHashMap<String, String> groupNameCache =
+      new ConcurrentHashMap<String, String>();
+  private AtomicInteger counter = new AtomicInteger(0);
+  private String[] groupNames;
+
+  @Override
+  public String group(byte[] identifier) {
+    String idStr = Bytes.toString(identifier);
+    String groupName = groupNameCache.get(idStr);
+    if (null == groupName) {
+      groupName = groupNames[counter.getAndIncrement() % groupNames.length];
+      String extantName = groupNameCache.putIfAbsent(idStr, groupName);
+      if (extantName != null) {
+        return extantName;
+      }
+    }
+    return groupName;
+  }
+
+  @Override
+  public void init(Configuration config, String providerId) {
+    int regionGroupNumber = config.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
+    groupNames = new String[regionGroupNumber];
+    for (int i = 0; i < regionGroupNumber; i++) {
+      groupNames[i] = providerId + GROUP_NAME_DELIMITER + "regiongroup-" + i;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/57679278/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java
deleted file mode 100644
index e1417b2..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
-// imports for classes still in regionserver.wal
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-
-/**
- * A WAL Provider that pre-creates N WALProviders and then limits our grouping strategy to
them.
- * Control the number of delegate providers via "hbase.wal.regiongrouping.numgroups." Control
- * the choice of delegate provider implementation and the grouping strategy the same as
- * {@link RegionGroupingProvider}.
- */
-@InterfaceAudience.Private
-public class BoundedRegionGroupingProvider extends RegionGroupingProvider {
-  private static final Log LOG = LogFactory.getLog(BoundedRegionGroupingProvider.class);
-
-  static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups";
-  static final int DEFAULT_NUM_REGION_GROUPS = 2;
-  private WALProvider[] delegates;
-  private AtomicInteger counter = new AtomicInteger(0);
-
-  @Override
-  public void init(final WALFactory factory, final Configuration conf,
-      final List<WALActionsListener> listeners, final String providerId) throws IOException
{
-    super.init(factory, conf, listeners, providerId);
-    // no need to check for and close down old providers; our parent class will throw on
re-invoke
-    delegates = new WALProvider[Math.max(1, conf.getInt(NUM_REGION_GROUPS,
-        DEFAULT_NUM_REGION_GROUPS))];
-    for (int i = 0; i < delegates.length; i++) {
-      delegates[i] = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners,
-          providerId + i);
-    }
-    LOG.info("Configured to run with " + delegates.length + " delegate WAL providers.");
-  }
-
-  @Override
-  WALProvider populateCache(final byte[] group) {
-    final WALProvider temp = delegates[counter.getAndIncrement() % delegates.length];
-    final WALProvider extant = cached.putIfAbsent(group, temp);
-    // if someone else beat us to initializing, just take what they set.
-    // note that in such a case we skew load away from the provider we picked at first
-    return extant == null ? temp : extant;
-  }
-
-  @Override
-  public void shutdown() throws IOException {
-    // save the last exception and rethrow
-    IOException failure = null;
-    for (WALProvider provider : delegates) {
-      try {
-        provider.shutdown();
-      } catch (IOException exception) {
-        LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
-        LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
-        failure = exception;
-      }
-    }
-    if (failure != null) {
-      throw failure;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    // save the last exception and rethrow
-    IOException failure = null;
-    for (WALProvider provider : delegates) {
-      try {
-        provider.close();
-      } catch (IOException exception) {
-        LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
-        LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
-        failure = exception;
-      }
-    }
-    if (failure != null) {
-      throw failure;
-    }
-  }
-
-  /**
-   * iff the given WALFactory is using the BoundedRegionGroupingProvider for meta and/or
non-meta,
-   * count the number of files (rolled and active). if either of them isn't, count 0
-   * for that provider.
-   * @param walFactory may not be null.
-   */
-  public static long getNumLogFiles(WALFactory walFactory) {
-    long result = 0;
-    if (walFactory.provider instanceof BoundedRegionGroupingProvider) {
-      BoundedRegionGroupingProvider groupProviders =
-          (BoundedRegionGroupingProvider)walFactory.provider;
-      for (int i = 0; i < groupProviders.delegates.length; i++) {
-        result +=
-            ((FSHLog)((DefaultWALProvider)(groupProviders.delegates[i])).log).getNumLogFiles();
-      }
-    }
-    WALProvider meta = walFactory.metaProvider.get();
-    if (meta instanceof BoundedRegionGroupingProvider) {
-      for (int i = 0; i < ((BoundedRegionGroupingProvider)meta).delegates.length; i++)
{
-        result += ((FSHLog)
-            ((DefaultWALProvider)(((BoundedRegionGroupingProvider)meta).delegates[i])).log)
-            .getNumLogFiles();      }
-    }
-    return result;
-  }
-
-  /**
-   * iff the given WALFactory is using the BoundedRegionGroupingProvider for meta and/or
non-meta,
-   * count the size of files (rolled and active). if either of them isn't, count 0
-   * for that provider.
-   * @param walFactory may not be null.
-   */
-  public static long getLogFileSize(WALFactory walFactory) {
-    long result = 0;
-    if (walFactory.provider instanceof BoundedRegionGroupingProvider) {
-      BoundedRegionGroupingProvider groupProviders =
-          (BoundedRegionGroupingProvider)walFactory.provider;
-      for (int i = 0; i < groupProviders.delegates.length; i++) {
-        result +=
-            ((FSHLog)((DefaultWALProvider)(groupProviders.delegates[i])).log).getLogFileSize();
-      }
-    }
-    WALProvider meta = walFactory.metaProvider.get();
-    if (meta instanceof BoundedRegionGroupingProvider) {
-      for (int i = 0; i < ((BoundedRegionGroupingProvider)meta).delegates.length; i++)
{
-        result += ((FSHLog)
-            ((DefaultWALProvider)(((BoundedRegionGroupingProvider)meta).delegates[i])).log)
-            .getLogFileSize();
-      }
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/57679278/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
index 661016d..d399100 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -129,36 +129,20 @@ public class DefaultWALProvider implements WALProvider {
    * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta,
    * count the number of files (rolled and active). if either of them aren't, count 0
    * for that provider.
-   * @param walFactory may not be null.
    */
-  public static long getNumLogFiles(WALFactory walFactory) {
-    long result = 0;
-    if (walFactory.provider instanceof DefaultWALProvider) {
-      result += ((FSHLog)((DefaultWALProvider)walFactory.provider).log).getNumLogFiles();
-    }
-    WALProvider meta = walFactory.metaProvider.get();
-    if (meta instanceof DefaultWALProvider) {
-      result += ((FSHLog)((DefaultWALProvider)meta).log).getNumLogFiles();
-    }
-    return result;
+  @Override
+  public long getNumLogFiles() {
+    return this.log.getNumLogFiles();
   }
 
   /**
    * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta,
    * count the size of files (rolled and active). if either of them aren't, count 0
    * for that provider.
-   * @param walFactory may not be null.
    */
-  public static long getLogFileSize(WALFactory walFactory) {
-    long result = 0;
-    if (walFactory.provider instanceof DefaultWALProvider) {
-      result += ((FSHLog)((DefaultWALProvider)walFactory.provider).log).getLogFileSize();
-    }
-    WALProvider meta = walFactory.metaProvider.get();
-    if (meta instanceof DefaultWALProvider) {
-      result += ((FSHLog)((DefaultWALProvider)meta).log).getLogFileSize();
-    }
-    return result;
+  @Override
+  public long getLogFileSize() {
+    return this.log.getLogFileSize();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/57679278/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 56d17a2..1701448 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -218,4 +218,14 @@ class DisabledWALProvider implements WALProvider {
       return "WAL disabled.";
     }
   }
+
+  @Override
+  public long getNumLogFiles() {
+    return 0;
+  }
+
+  @Override
+  public long getLogFileSize() {
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/57679278/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index eb2c426..8395818 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -32,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 
 // imports for classes still in regionserver.wal
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * A WAL Provider that returns a WAL per group of regions.
@@ -57,21 +60,23 @@ class RegionGroupingProvider implements WALProvider {
    * Map identifiers to a group number.
    */
   public static interface RegionGroupingStrategy {
+    String GROUP_NAME_DELIMITER = ".";
     /**
      * Given an identifier, pick a group.
      * the byte[] returned for a given group must always use the same instance, since we
      * will be using it as a hash key.
      */
-    byte[] group(final byte[] identifier);
-    void init(Configuration config);
+    String group(final byte[] identifier);
+    void init(Configuration config, String providerId);
   }
 
   /**
    * Maps between configuration names for strategies and implementation classes.
    */
   static enum Strategies {
-    defaultStrategy(IdentityGroupingStrategy.class),
-    identity(IdentityGroupingStrategy.class);
+    defaultStrategy(BoundedGroupingStrategy.class),
+    identity(IdentityGroupingStrategy.class),
+    bounded(BoundedGroupingStrategy.class);
 
     final Class<? extends RegionGroupingStrategy> clazz;
     Strategies(Class<? extends RegionGroupingStrategy> clazz) {
@@ -97,7 +102,7 @@ class RegionGroupingProvider implements WALProvider {
     LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
     try {
       final RegionGroupingStrategy result = clazz.newInstance();
-      result.init(conf);
+      result.init(conf, providerId);
       return result;
     } catch (InstantiationException exception) {
       LOG.error("couldn't set up region grouping strategy, check config key " +
@@ -112,14 +117,18 @@ class RegionGroupingProvider implements WALProvider {
     }
   }
 
-  private static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
-  private static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
+  public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
+  public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
 
   static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate";
   static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider.name();
 
-  protected final ConcurrentMap<byte[], WALProvider> cached =
-      new ConcurrentHashMap<byte[], WALProvider>();
+  /** A group-provider mapping, recommended to make sure one-one rather than many-one mapping
*/
+  protected final ConcurrentMap<String, WALProvider> cached =
+      new ConcurrentHashMap<String, WALProvider>();
+  /** Stores delegation providers (no duplicated) used by this RegionGroupingProvider */
+  private final Set<WALProvider> providers = Collections
+      .synchronizedSet(new HashSet<WALProvider>());
 
 
   protected RegionGroupingStrategy strategy = null;
@@ -142,7 +151,7 @@ class RegionGroupingProvider implements WALProvider {
   /**
    * Populate the cache for this group.
    */
-  WALProvider populateCache(final byte[] group) throws IOException {
+  WALProvider populateCache(final String group) throws IOException {
     final WALProvider temp = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER,
         listeners, providerId + "-" + UUID.randomUUID());
     final WALProvider extant = cached.putIfAbsent(group, temp);
@@ -151,12 +160,13 @@ class RegionGroupingProvider implements WALProvider {
       temp.close();
       return extant;
     }
+    providers.add(temp);
     return temp;
   }
 
   @Override
   public WAL getWAL(final byte[] identifier) throws IOException {
-    final byte[] group = strategy.group(identifier);
+    final String group = strategy.group(identifier);
     WALProvider provider = cached.get(group);
     if (null == provider) {
       provider = populateCache(group);
@@ -168,13 +178,15 @@ class RegionGroupingProvider implements WALProvider {
   public void shutdown() throws IOException {
     // save the last exception and rethrow
     IOException failure = null;
-    for (WALProvider provider : cached.values()) {
-      try {
-        provider.shutdown();
-      } catch (IOException exception) {
-        LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
-        LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
-        failure = exception;
+    synchronized (providers) {
+      for (WALProvider provider : providers) {
+        try {
+          provider.shutdown();
+        } catch (IOException exception) {
+          LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
+          LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
+          failure = exception;
+        }
       }
     }
     if (failure != null) {
@@ -186,13 +198,15 @@ class RegionGroupingProvider implements WALProvider {
   public void close() throws IOException {
     // save the last exception and rethrow
     IOException failure = null;
-    for (WALProvider provider : cached.values()) {
-      try {
-        provider.close();
-      } catch (IOException exception) {
-        LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
-        LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
-        failure = exception;
+    synchronized (providers) {
+      for (WALProvider provider : providers) {
+        try {
+          provider.close();
+        } catch (IOException exception) {
+          LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
+          LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
+          failure = exception;
+        }
       }
     }
     if (failure != null) {
@@ -202,11 +216,33 @@ class RegionGroupingProvider implements WALProvider {
 
   static class IdentityGroupingStrategy implements RegionGroupingStrategy {
     @Override
-    public void init(Configuration config) {}
+    public void init(Configuration config, String providerId) {}
     @Override
-    public byte[] group(final byte[] identifier) {
-      return identifier;
+    public String group(final byte[] identifier) {
+      return Bytes.toString(identifier);
     }
   }
 
+  @Override
+  public long getNumLogFiles() {
+    long numLogFiles = 0;
+    synchronized (providers) {
+      for (WALProvider provider : providers) {
+        numLogFiles += provider.getNumLogFiles();
+      }
+    }
+    return numLogFiles;
+  }
+
+  @Override
+  public long getLogFileSize() {
+    long logFileSize = 0;
+    synchronized (providers) {
+      for (WALProvider provider : providers) {
+        logFileSize += provider.getLogFileSize();
+      }
+    }
+    return logFileSize;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/57679278/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index e44a4d1..c869a5f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -76,7 +76,7 @@ public class WALFactory {
   static enum Providers {
     defaultProvider(DefaultWALProvider.class),
     filesystem(DefaultWALProvider.class),
-    multiwal(BoundedRegionGroupingProvider.class);
+    multiwal(RegionGroupingProvider.class);
 
     Class<? extends WALProvider> clazz;
     Providers(Class<? extends WALProvider> clazz) {
@@ -444,4 +444,12 @@ public class WALFactory {
       throws IOException {
     return DefaultWALProvider.createWriter(configuration, fs, path, false);
   }
+
+  public final WALProvider getWALProvider() {
+    return this.provider;
+  }
+
+  public final WALProvider getMetaWALProvider() {
+    return this.metaProvider.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/57679278/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index b27abf9..b4c4067 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -80,4 +80,14 @@ public interface WALProvider {
     long getLength() throws IOException;
   }
 
+  /**
+   * Get number of the log files this provider is managing
+   */
+  long getNumLogFiles();
+
+  /**
+   * Get size of the log files this provider is managing
+   */
+  long getLogFileSize();
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/57679278/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index d2581a1..e06a587 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -230,4 +230,14 @@ public class IOTestProvider implements WALProvider {
       }
     }
   }
+
+  @Override
+  public long getNumLogFiles() {
+    return this.log.getNumLogFiles();
+  }
+
+  @Override
+  public long getLogFileSize() {
+    return this.log.getLogFileSize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/57679278/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java
deleted file mode 100644
index c54e794..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.NUM_REGION_GROUPS;
-import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.DEFAULT_NUM_REGION_GROUPS;
-import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-@Category(LargeTests.class)
-public class TestBoundedRegionGroupingProvider {
-  protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingProvider.class);
-
-  @Rule
-  public TestName currentTest = new TestName();
-  protected static Configuration conf;
-  protected static FileSystem fs;
-  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  @Before
-  public void setUp() throws Exception {
-    FileStatus[] entries = fs.listStatus(new Path("/"));
-    for (FileStatus dir : entries) {
-      fs.delete(dir.getPath(), true);
-    }
-  }
-
-  @After
-  public void tearDown() throws Exception {
-  }
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    conf = TEST_UTIL.getConfiguration();
-    // Make block sizes small.
-    conf.setInt("dfs.blocksize", 1024 * 1024);
-    // quicker heartbeat interval for faster DN death notification
-    conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
-    conf.setInt("dfs.heartbeat.interval", 1);
-    conf.setInt("dfs.client.socket-timeout", 5000);
-
-    // faster failover with cluster.shutdown();fs.close() idiom
-    conf.setInt("hbase.ipc.client.connect.max.retries", 1);
-    conf.setInt("dfs.client.block.recovery.retries", 1);
-    conf.setInt("hbase.ipc.client.connection.maxidletime", 500);
-
-    conf.setClass(WAL_PROVIDER, BoundedRegionGroupingProvider.class, WALProvider.class);
-
-    TEST_UTIL.startMiniDFSCluster(3);
-
-    fs = TEST_UTIL.getDFSCluster().getFileSystem();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  /**
-   * Write to a log file with three concurrent threads and verifying all data is written.
-   */
-  @Test
-  public void testConcurrentWrites() throws Exception {
-    // Run the WPE tool with three threads writing 3000 edits each concurrently.
-    // When done, verify that all edits were written.
-    int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
-        new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
-    assertEquals(0, errCode);
-  }
-
-  /**
-   * Make sure we can successfully run with more regions then our bound.
-   */
-  @Test
-  public void testMoreRegionsThanBound() throws Exception {
-    final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2);
-    int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
-        new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
-            "-regions", parallelism});
-    assertEquals(0, errCode);
-  }
-
-  @Test
-  public void testBoundsGreaterThanDefault() throws Exception {
-    final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
-    try {
-      conf.setInt(NUM_REGION_GROUPS, temp*4);
-      final String parallelism = Integer.toString(temp*4);
-      int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
-          new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations",
"3000",
-              "-regions", parallelism});
-      assertEquals(0, errCode);
-    } finally {
-      conf.setInt(NUM_REGION_GROUPS, temp);
-    }
-  }
-
-  @Test
-  public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception {
-    final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
-    try {
-      conf.setInt(NUM_REGION_GROUPS, temp*4);
-      final String parallelism = Integer.toString(temp*4*2);
-      int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
-          new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations",
"3000",
-              "-regions", parallelism});
-      assertEquals(0, errCode);
-    } finally {
-      conf.setInt(NUM_REGION_GROUPS, temp);
-    }
-  }
-
-  /**
-   * Ensure that we can use Set.add to deduplicate WALs
-   */
-  @Test
-  public void setMembershipDedups() throws IOException {
-    final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
-    WALFactory wals = null;
-    try {
-      conf.setInt(NUM_REGION_GROUPS, temp*4);
-      // Set HDFS root directory for storing WAL
-      FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
-
-      wals = new WALFactory(conf, null, currentTest.getMethodName());
-      final Set<WAL> seen = new HashSet<WAL>(temp*4);
-      final Random random = new Random();
-      int count = 0;
-      // we know that this should see one of the wals more than once
-      for (int i = 0; i < temp*8; i++) {
-        final WAL maybeNewWAL = wals.getWAL(Bytes.toBytes(random.nextInt()));
-        LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL);
-        if (seen.add(maybeNewWAL)) {
-          count++;
-        }
-      }
-      assertEquals("received back a different number of WALs that are not equal() to each
other " +
-          "than the bound we placed.", temp*4, count);
-    } finally {
-      if (wals != null) {
-        wals.close();
-      }
-      conf.setInt(NUM_REGION_GROUPS, temp);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/57679278/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
new file mode 100644
index 0000000..eadf31c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingStrategy.java
@@ -0,0 +1,189 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS;
+import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS;
+import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
+import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.REGION_GROUPING_STRATEGY;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({LargeTests.class})
+public class TestBoundedRegionGroupingStrategy {
+  protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingStrategy.class);
+
+  @Rule
+  public TestName currentTest = new TestName();
+  protected static Configuration conf;
+  protected static FileSystem fs;
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Before
+  public void setUp() throws Exception {
+    FileStatus[] entries = fs.listStatus(new Path("/"));
+    for (FileStatus dir : entries) {
+      fs.delete(dir.getPath(), true);
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    // Make block sizes small.
+    conf.setInt("dfs.blocksize", 1024 * 1024);
+    // quicker heartbeat interval for faster DN death notification
+    conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
+    conf.setInt("dfs.heartbeat.interval", 1);
+    conf.setInt("dfs.client.socket-timeout", 5000);
+
+    // faster failover with cluster.shutdown();fs.close() idiom
+    conf.setInt("hbase.ipc.client.connect.max.retries", 1);
+    conf.setInt("dfs.client.block.recovery.retries", 1);
+    conf.setInt("hbase.ipc.client.connection.maxidletime", 500);
+
+    conf.setClass(WAL_PROVIDER, RegionGroupingProvider.class, WALProvider.class);
+    conf.set(REGION_GROUPING_STRATEGY, RegionGroupingProvider.Strategies.bounded.name());
+
+    TEST_UTIL.startMiniDFSCluster(3);
+
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Write to a log file with three concurrent threads and verifying all data is written.
+   */
+  @Test
+  public void testConcurrentWrites() throws Exception {
+    // Run the WPE tool with three threads writing 3000 edits each concurrently.
+    // When done, verify that all edits were written.
+    int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
+        new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
+    assertEquals(0, errCode);
+  }
+
+  /**
+   * Make sure we can successfully run with more regions then our bound.
+   */
+  @Test
+  public void testMoreRegionsThanBound() throws Exception {
+    final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2);
+    int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
+        new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
+            "-regions", parallelism});
+    assertEquals(0, errCode);
+  }
+
+  @Test
+  public void testBoundsGreaterThanDefault() throws Exception {
+    final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
+    try {
+      conf.setInt(NUM_REGION_GROUPS, temp*4);
+      final String parallelism = Integer.toString(temp*4);
+      int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
+          new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations",
"3000",
+              "-regions", parallelism});
+      assertEquals(0, errCode);
+    } finally {
+      conf.setInt(NUM_REGION_GROUPS, temp);
+    }
+  }
+
+  @Test
+  public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception {
+    final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
+    try {
+      conf.setInt(NUM_REGION_GROUPS, temp*4);
+      final String parallelism = Integer.toString(temp*4*2);
+      int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
+          new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations",
"3000",
+              "-regions", parallelism});
+      assertEquals(0, errCode);
+    } finally {
+      conf.setInt(NUM_REGION_GROUPS, temp);
+    }
+  }
+
+  /**
+   * Ensure that we can use Set.add to deduplicate WALs
+   */
+  @Test
+  public void setMembershipDedups() throws IOException {
+    final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
+    WALFactory wals = null;
+    try {
+      conf.setInt(NUM_REGION_GROUPS, temp*4);
+      // Set HDFS root directory for storing WAL
+      FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
+
+      wals = new WALFactory(conf, null, currentTest.getMethodName());
+      final Set<WAL> seen = new HashSet<WAL>(temp*4);
+      final Random random = new Random();
+      int count = 0;
+      // we know that this should see one of the wals more than once
+      for (int i = 0; i < temp*8; i++) {
+        final WAL maybeNewWAL = wals.getWAL(Bytes.toBytes(random.nextInt()));
+        LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL);
+        if (seen.add(maybeNewWAL)) {
+          count++;
+        }
+      }
+      assertEquals("received back a different number of WALs that are not equal() to each
other " +
+          "than the bound we placed.", temp*4, count);
+    } finally {
+      if (wals != null) {
+        wals.close();
+      }
+      conf.setInt(NUM_REGION_GROUPS, temp);
+    }
+  }
+}


Mime
View raw message