hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeffr...@apache.org
Subject svn commit: r1499925 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ hbase-it/src/test/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/hbase/master/ hbase-server/src/main/java/org/apache/hadoop...
Date Fri, 05 Jul 2013 07:44:00 GMT
Author: jeffreyz
Date: Fri Jul  5 07:44:00 2013
New Revision: 1499925

URL: http://svn.apache.org/r1499925
Log:
HBASE-8729: distributedLogReplay may hang during chained region server failure

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java?rev=1499925&r1=1499924&r2=1499925&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
(original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
Fri Jul  5 07:44:00 2013
@@ -223,6 +223,13 @@ public enum EventType {
    * Master is processing recovery of regions found in ZK RIT
    */
   M_MASTER_RECOVERY         (73, ExecutorType.MASTER_SERVER_OPERATIONS),
+  /**
+   * Master controlled events to be executed on the master.<br>
+   * 
+   * M_LOG_REPLAY<br>
+   * Master is processing log replay of failed region server
+   */
+  M_LOG_REPLAY              (74, ExecutorType.M_LOG_REPLAY_OPS),
 
   /**
    * RS controlled events to be executed on the RS.<br>

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java?rev=1499925&r1=1499924&r2=1499925&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
(original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
Fri Jul  5 07:44:00 2013
@@ -35,6 +35,7 @@ public enum ExecutorType {
   MASTER_TABLE_OPERATIONS    (4),
   MASTER_RS_SHUTDOWN         (5),
   MASTER_META_SERVER_OPERATIONS (6),
+  M_LOG_REPLAY_OPS           (7),
 
   // RegionServer executor services
   RS_OPEN_REGION             (20),

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java?rev=1499925&r1=1499924&r2=1499925&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java
(original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java
Fri Jul  5 07:44:00 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.ChaosMonkey;
 import org.junit.After;
 import org.junit.Before;
@@ -43,8 +44,18 @@ public class IntegrationTestDataIngestWi
   @Before
   public void setUp() throws Exception {
     util= getTestingUtil(null);
+    Configuration conf = util.getConfiguration();
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 25);
+    if (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
+      HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG)) {
+      // when distributedLogReplay is enabled, we need to make sure rpc timeout & retires
are
+      // smaller enough in order for the replay can complete before ChaosMonkey kills another
region
+      // server
+      conf.setInt("hbase.regionserver.handler.count", 20);
+      conf.setInt("hbase.log.replay.retries.number", 2);
+      conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
+    } 
     if(!util.isDistributedCluster()) {
-      // In MiniCluster mode, we increase number of RS a little bit to speed the test
       NUM_SLAVES_BASE = 5;
     }
     super.setUp(NUM_SLAVES_BASE);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1499925&r1=1499924&r2=1499925&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Fri
Jul  5 07:44:00 2013
@@ -1096,9 +1096,11 @@ MasterServices, Server {
    this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
       conf.getInt("hbase.master.executor.closeregion.threads", 5));
    this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
-      conf.getInt("hbase.master.executor.serverops.threads", 3));
+      conf.getInt("hbase.master.executor.serverops.threads", 5));
    this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
       conf.getInt("hbase.master.executor.serverops.threads", 5));
+   this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
+      conf.getInt("hbase.master.executor.logreplayops.threads", 10));
 
    // We depend on there being only one instance of this executor running
    // at a time.  To do concurrency, would need fencing of enable/disable of

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1499925&r1=1499924&r2=1499925&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
Fri Jul  5 07:44:00 2013
@@ -319,21 +319,33 @@ public class MasterFileSystem {
 
   private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException
{
     List<Path> logDirs = new ArrayList<Path>();
-    for (ServerName serverName: serverNames) {
-      Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
-      Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
-      // Rename the directory so a rogue RS doesn't create more HLogs
-      if (fs.exists(logDir)) {
-        if (!this.fs.rename(logDir, splitDir)) {
-          throw new IOException("Failed fs.rename for log split: " + logDir);
+    boolean needReleaseLock = false;
+    if (!this.services.isInitialized()) {
+      // during master initialization, we could have multiple places splitting a same wal
+      this.splitLogLock.lock();
+      needReleaseLock = true;
+    }
+    try {
+      for (ServerName serverName : serverNames) {
+        Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
+        Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
+        // Rename the directory so a rogue RS doesn't create more HLogs
+        if (fs.exists(logDir)) {
+          if (!this.fs.rename(logDir, splitDir)) {
+            throw new IOException("Failed fs.rename for log split: " + logDir);
+          }
+          logDir = splitDir;
+          LOG.debug("Renamed region directory: " + splitDir);
+        } else if (!fs.exists(splitDir)) {
+          LOG.info("Log dir for server " + serverName + " does not exist");
+          continue;
         }
-        logDir = splitDir;
-        LOG.debug("Renamed region directory: " + splitDir);
-      } else if (!fs.exists(splitDir)) {
-        LOG.info("Log dir for server " + serverName + " does not exist");
-        continue;
+        logDirs.add(splitDir);
+      }
+    } finally {
+      if (needReleaseLock) {
+        this.splitLogLock.unlock();
       }
-      logDirs.add(splitDir);
     }
     return logDirs;
   }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java?rev=1499925&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java
(added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/LogReplayHandler.java
Fri Jul  5 07:44:00 2013
@@ -0,0 +1,88 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.master.DeadServer;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.MasterServices;
+
+/**
+ * Handle logReplay work from SSH. Having a separate handler is not to block SSH in re-assigning
+ * regions from dead servers. Otherwise, available SSH handlers could be blocked by logReplay
work
+ * (from {@link MasterFileSystem#splitLog(ServerName)}). During logReplay, if a receiving
RS(say A)
+ * fails again, regions on A won't be able to be assigned to another live RS which causes
the log
+ * replay unable to complete because WAL edits replay depends on receiving RS to be live
+ */
+@InterfaceAudience.Private
+public class LogReplayHandler extends EventHandler {
+  private static final Log LOG = LogFactory.getLog(LogReplayHandler.class);
+  private final ServerName serverName;
+  protected final Server master;
+  protected final MasterServices services;
+  protected final DeadServer deadServers;
+
+  public LogReplayHandler(final Server server, final MasterServices services,
+      final DeadServer deadServers, final ServerName serverName) {
+    super(server, EventType.M_LOG_REPLAY);
+    this.master = server;
+    this.services = services;
+    this.deadServers = deadServers;
+    this.serverName = serverName;
+    this.deadServers.add(serverName);
+  }
+
+  @Override
+  public String toString() {
+    String name = serverName.toString();
+    return getClass().getSimpleName() + "-" + name + "-" + getSeqid();
+  }
+
+  @Override
+  public void process() throws IOException {
+    try {
+      if (this.master != null && this.master.isStopped()) {
+        // we're exiting ...
+        return;
+      }
+      this.services.getMasterFileSystem().splitLog(serverName);
+    } catch (Exception ex) {
+      if (ex instanceof IOException) {
+        // resubmit log replay work when failed
+        this.services.getExecutorService().submit((LogReplayHandler) this);
+        this.deadServers.add(serverName);
+        throw new IOException("failed log replay for " + serverName + ", will retry", ex);
+      } else {
+        throw new IOException(ex);
+      }
+    } finally {
+      this.deadServers.finish(serverName);
+    }
+    // logReplay is the last step of SSH so log a line to indicate that
+    LOG.info("Finished processing of shutdown of " + serverName);
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java?rev=1499925&r1=1499924&r2=1499925&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
Fri Jul  5 07:44:00 2013
@@ -115,6 +115,7 @@ public class ServerShutdownHandler exten
 
   @Override
   public void process() throws IOException {
+    boolean hasLogReplayWork = false;
     final ServerName serverName = this.serverName;
     try {
 
@@ -280,7 +281,10 @@ public class ServerShutdownHandler exten
                   + " didn't complete assignment in time");
             }
           }
-          this.services.getMasterFileSystem().splitLog(serverName);
+          // submit logReplay work
+          this.services.getExecutorService().submit(
+            new LogReplayHandler(this.server, this.services, this.deadServers, this.serverName));
+          hasLogReplayWork = true;
         }
       } catch (Exception ex) {
         if (ex instanceof IOException) {
@@ -293,7 +297,9 @@ public class ServerShutdownHandler exten
       this.deadServers.finish(serverName);
     }
 
-    LOG.info("Finished processing of shutdown of " + serverName);
+    if (!hasLogReplayWork) {
+      LOG.info("Finished processing of shutdown of " + serverName);
+    }
   }
 
   private void resubmit(final ServerName serverName, IOException ex) throws IOException {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1499925&r1=1499924&r2=1499925&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Fri Jul  5 07:44:00 2013
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.DaemonThreadFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HealthCheckChore;
@@ -1577,8 +1578,10 @@ public class HRegionServer implements Cl
     // quite a while inside HConnection layer. The worker won't be available for other
     // tasks even after current task is preempted after a split task times out.
     Configuration sinkConf = HBaseConfiguration.create(conf);
-    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER - 2);
+    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 
+      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
+    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
     this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
     splitLogWorker.start();
@@ -3976,11 +3979,21 @@ public class HRegionServer implements Cl
           case SUCCESS:
             break;
         }
+        if (isReplay && codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS)
{
+          // in replay mode, we only need to catpure the first error because we will retry
the whole
+          // batch when an error happens
+          break;
+        }
       }
     } catch (IOException ie) {
       ActionResult result = ResponseConverter.buildActionResult(ie);
       for (int i = 0; i < mutations.size(); i++) {
         builder.setResult(i, result);
+        if (isReplay) {
+          // in replay mode, we only need to catpure the first error because we will retry
the whole
+          // batch when an error happens
+          break;
+        }
       }
     }
     long after = EnvironmentEdgeManager.currentTimeMillis();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1499925&r1=1499924&r2=1499925&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
Fri Jul  5 07:44:00 2013
@@ -171,9 +171,8 @@ public class SplitLogWorker extends ZooK
       this.watcher.registerListener(this);
       // initialize a new connection for splitlogworker configuration
       HConnectionManager.getConnection(conf);
-      int res;
       // wait for master to create the splitLogZnode
-      res = -1;
+      int res = -1;
       while (res == -1 && !exitWorker) {
         try {
           res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
@@ -386,12 +385,9 @@ public class SplitLogWorker extends ZooK
         case RESIGNED:
           if (exitWorker) {
             LOG.info("task execution interrupted because worker is exiting " + path);
-            endTask(new SplitLogTask.Resigned(this.serverName),
-              SplitLogCounters.tot_wkr_task_resigned);
-          } else {
-            SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
-            LOG.info("task execution interrupted via zk by manager " + path);
           }
+          endTask(new SplitLogTask.Resigned(this.serverName), 
+            SplitLogCounters.tot_wkr_task_resigned);
           break;
       }
     } finally {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1499925&r1=1499924&r2=1499925&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
Fri Jul  5 07:44:00 2013
@@ -546,7 +546,6 @@ public class HLogSplitter {
         lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
         if (lastFlushedSequenceId == null) {
           if (this.distributedLogReplay) {
-            lastFlushedSequenceId = -1L;
             RegionStoreSequenceIds ids =
                 SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName,
key);
             if (ids != null) {
@@ -555,11 +554,10 @@ public class HLogSplitter {
           } else if (sequenceIdChecker != null) {
             lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
           }
-          if (lastFlushedSequenceId != null && lastFlushedSequenceId >= 0) {
-            lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
-          } else {
+          if (lastFlushedSequenceId == null) {
             lastFlushedSequenceId = -1L;
           }
+          lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
         }
         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
           editsSkipped++;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java?rev=1499925&r1=1499924&r2=1499925&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
Fri Jul  5 07:44:00 2013
@@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.client.Ro
 import org.apache.hadoop.hbase.client.ServerCallable;
 import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
 import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -60,6 +62,7 @@ import com.google.protobuf.ServiceExcept
 public class WALEditsReplaySink {
 
   private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
+  private static final int MAX_BATCH_SIZE = 3000;
 
   private final Configuration conf;
   private final HConnection conn;
@@ -67,6 +70,7 @@ public class WALEditsReplaySink {
   private final MetricsWALEditsReplay metrics;
   private final AtomicLong totalReplayedEdits = new AtomicLong();
   private final boolean skipErrors;
+  private final int replayTimeout;
 
   /**
    * Create a sink for WAL log entries replay
@@ -83,6 +87,8 @@ public class WALEditsReplaySink {
     this.tableName = tableName;
     this.skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
       HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
+    // a single replay operation time out and default is 60 seconds
+    this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
   }
 
   /**
@@ -121,7 +127,18 @@ public class WALEditsReplaySink {
 
     // replaying edits by region
     for (HRegionInfo curRegion : actionsByRegion.keySet()) {
-      replayEdits(loc, curRegion, actionsByRegion.get(curRegion));
+      List<Action<Row>> allActions = actionsByRegion.get(curRegion);
+      // send edits in chunks
+      int totalActions = allActions.size();
+      int replayedActions = 0;
+      int curBatchSize = 0;
+      for (; replayedActions < totalActions;) {
+        curBatchSize = (totalActions > (MAX_BATCH_SIZE + replayedActions)) ? MAX_BATCH_SIZE
+                : (totalActions - replayedActions);
+        replayEdits(loc, curRegion, allActions.subList(replayedActions, 
+          replayedActions + curBatchSize));
+        replayedActions += curBatchSize;
+      }
     }
 
     long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
@@ -173,7 +190,7 @@ public class WALEditsReplaySink {
     ReplayServerCallable(final HConnection connection, final byte [] tableName, 
         final HRegionLocation regionLoc, final HRegionInfo regionInfo,
         final List<Action<Row>> actions) {
-      super(connection, tableName, null);
+      super(connection, tableName, null, replayTimeout);
       this.actions = actions;
       this.regionInfo = regionInfo;
       this.location = regionLoc;



Mime
View raw message