Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1B53B109A0 for ; Sun, 24 Nov 2013 19:22:25 +0000 (UTC) Received: (qmail 72142 invoked by uid 500); 24 Nov 2013 19:22:24 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 72108 invoked by uid 500); 24 Nov 2013 19:22:24 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 72101 invoked by uid 99); 24 Nov 2013 19:22:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 24 Nov 2013 19:22:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 24 Nov 2013 19:22:17 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4174923888CD; Sun, 24 Nov 2013 19:21:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1545052 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ hbase-server/... Date: Sun, 24 Nov 2013 19:21:54 -0000 To: commits@hbase.apache.org From: jeffreyz@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131124192155.4174923888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jeffreyz Date: Sun Nov 24 19:21:54 2013 New Revision: 1545052 URL: http://svn.apache.org/r1545052 Log: HBASE-9736: Allow more than one log splitter per RS Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java?rev=1545052&r1=1545051&r2=1545052&view=diff ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java (original) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java Sun Nov 24 19:21:54 2013 @@ -250,7 +250,15 @@ public enum EventType { * * RS_PARALLEL_SEEK */ - RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK); + RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK), + + /** + * RS wal recovery work items(either creating recover.edits or directly replay wals) + * to be executed on the RS.
+ * + * RS_LOG_REPLAY + */ + RS_LOG_REPLAY (81, ExecutorType.RS_LOG_REPLAY_OPS); private final int code; private final ExecutorType executor; Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java?rev=1545052&r1=1545051&r2=1545052&view=diff ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java (original) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java Sun Nov 24 19:21:54 2013 @@ -44,7 +44,8 @@ public enum ExecutorType { RS_CLOSE_REGION (23), RS_CLOSE_ROOT (24), RS_CLOSE_META (25), - RS_PARALLEL_SEEK (26); + RS_PARALLEL_SEEK (26), + RS_LOG_REPLAY_OPS (27); ExecutorType(int value) {} Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1545052&r1=1545051&r2=1545052&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Nov 24 19:21:54 2013 @@ -1583,6 +1583,8 @@ public class HRegionServer implements Cl this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } + this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, + conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS)); Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", uncaughtExceptionHandler); Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1545052&r1=1545051&r2=1545052&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Sun Nov 24 19:21:54 2013 @@ -25,23 +25,27 @@ import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang.math.RandomUtils; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -78,12 +82,17 @@ import org.apache.zookeeper.data.Stat; */ @InterfaceAudience.Private public class SplitLogWorker extends ZooKeeperListener implements Runnable { + public static final int DEFAULT_MAX_SPLITTERS = 2; + private static final Log LOG = LogFactory.getLog(SplitLogWorker.class); private static final int checkInterval = 5000; // 5 seconds + private static final int FAILED_TO_OWN_TASK = -1; Thread worker; private final ServerName serverName; private final TaskExecutor splitTaskExecutor; + // thread pool which executes recovery work + private final ExecutorService executorService; private final Object taskReadyLock = new Object(); volatile int taskReadySeq = 0; @@ -95,9 +104,11 @@ public class SplitLogWorker extends ZooK private final int report_period; private RegionServerServices server = null; private Configuration conf = null; + protected final AtomicInteger tasksInProgress = new AtomicInteger(0); + private int maxConcurrentTasks = 0; - public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, - RegionServerServices server, TaskExecutor splitTaskExecutor) { + public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server, + TaskExecutor splitTaskExecutor) { super(watcher); this.server = server; this.serverName = server.getServerName(); @@ -105,16 +116,9 @@ public class SplitLogWorker extends ZooK report_period = conf.getInt("hbase.splitlog.report.period", conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3); this.conf = conf; - } - - public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName, - TaskExecutor splitTaskExecutor) { - super(watcher); - this.serverName = serverName; - this.splitTaskExecutor = splitTaskExecutor; - report_period = conf.getInt("hbase.splitlog.report.period", - conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3); - this.conf = conf; + this.executorService = this.server.getExecutorService(); + this.maxConcurrentTasks = + conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); } public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf, @@ -238,11 +242,18 @@ public class SplitLogWorker extends ZooK break; } } - for (int i = 0; i < paths.size(); i ++) { + int numTasks = paths.size(); + for (int i = 0; i < numTasks; i++) { int idx = (i + offset) % paths.size(); // don't call ZKSplitLog.getNodeName() because that will lead to // double encoding of the path name - grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); + if (this.calculateAvailableSplitters(numTasks) > 0) { + grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); + } else { + LOG.debug("Current region server " + this.serverName + " has " + + this.tasksInProgress.get() + " tasks in progress and can't take more."); + break; + } if (exitWorker) { return; } @@ -337,72 +348,33 @@ public class SplitLogWorker extends ZooK return; } - currentVersion = stat.getVersion(); - if (!attemptToOwnTask(true)) { + currentVersion = attemptToOwnTask(true, watcher, serverName, path, stat.getVersion()); + if (currentVersion < 0) { SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); return; } if (ZKSplitLog.isRescanNode(watcher, currentTask)) { - endTask(new SplitLogTask.Done(this.serverName), - SplitLogCounters.tot_wkr_task_acquired_rescan); + HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName), + SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion); return; } + LOG.info("worker " + serverName + " acquired task " + path); SplitLogCounters.tot_wkr_task_acquired.incrementAndGet(); getDataSetWatchAsync(); - t = System.currentTimeMillis(); - TaskExecutor.Status status; - - status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask), - new CancelableProgressable() { - - private long last_report_at = 0; + submitTask(path, currentVersion, this.report_period); - @Override - public boolean progress() { - long t = EnvironmentEdgeManager.currentTimeMillis(); - if ((t - last_report_at) > report_period) { - last_report_at = t; - if (!attemptToOwnTask(false)) { - LOG.warn("Failed to heartbeat the task" + currentTask); - return false; - } - } - return true; - } - }); - - switch (status) { - case DONE: - endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done); - break; - case PREEMPTED: - SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); - LOG.warn("task execution prempted " + path); - break; - case ERR: - if (!exitWorker) { - endTask(new SplitLogTask.Err(this.serverName), SplitLogCounters.tot_wkr_task_err); - break; - } - // if the RS is exiting then there is probably a tons of stuff - // that can go wrong. Resign instead of signaling error. - //$FALL-THROUGH$ - case RESIGNED: - if (exitWorker) { - LOG.info("task execution interrupted because worker is exiting " + path); - } - endTask(new SplitLogTask.Resigned(this.serverName), - SplitLogCounters.tot_wkr_task_resigned); - break; + // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks + try { + int sleepTime = RandomUtils.nextInt(500) + 500; + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.warn("Interrupted while yielding for other region servers", e); + Thread.currentThread().interrupt(); } } finally { - if (t > 0) { - LOG.info("worker " + serverName + " done with task " + path + - " in " + (System.currentTimeMillis() - t) + "ms"); - } synchronized (grabTaskLock) { workerInGrabTask = false; // clear the interrupt from stopTask() otherwise the next task will @@ -412,76 +384,109 @@ public class SplitLogWorker extends ZooK } } + /** - * Try to own the task by transitioning the zk node data from UNASSIGNED to - * OWNED. + * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED. *

- * This method is also used to periodically heartbeat the task progress by - * transitioning the node from OWNED to OWNED. + * This method is also used to periodically heartbeat the task progress by transitioning the node + * from OWNED to OWNED. *

- * @return true if task path is successfully locked + * @param isFirstTime + * @param zkw + * @param server + * @param task + * @param taskZKVersion + * @return non-negative integer value when task can be owned by current region server otherwise -1 */ - private boolean attemptToOwnTask(boolean isFirstTime) { + protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw, + ServerName server, String task, int taskZKVersion) { + int latestZKVersion = FAILED_TO_OWN_TASK; try { - SplitLogTask slt = new SplitLogTask.Owned(this.serverName); - Stat stat = - this.watcher.getRecoverableZooKeeper().setData(currentTask, slt.toByteArray(), currentVersion); + SplitLogTask slt = new SplitLogTask.Owned(server); + Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); if (stat == null) { - LOG.warn("zk.setData() returned null for path " + currentTask); + LOG.warn("zk.setData() returned null for path " + task); SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); - return (false); + return FAILED_TO_OWN_TASK; } - currentVersion = stat.getVersion(); + latestZKVersion = stat.getVersion(); SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet(); - return (true); + return latestZKVersion; } catch (KeeperException e) { if (!isFirstTime) { if (e.code().equals(KeeperException.Code.NONODE)) { - LOG.warn("NONODE failed to assert ownership for " + currentTask, e); + LOG.warn("NONODE failed to assert ownership for " + task, e); } else if (e.code().equals(KeeperException.Code.BADVERSION)) { - LOG.warn("BADVERSION failed to assert ownership for " + - currentTask, e); + LOG.warn("BADVERSION failed to assert ownership for " + task, e); } else { - LOG.warn("failed to assert ownership for " + currentTask, e); + LOG.warn("failed to assert ownership for " + task, e); } } } catch (InterruptedException e1) { LOG.warn("Interrupted while trying to assert ownership of " + - currentTask + " " + StringUtils.stringifyException(e1)); + task + " " + StringUtils.stringifyException(e1)); Thread.currentThread().interrupt(); } SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); - return (false); + return FAILED_TO_OWN_TASK; } /** - * endTask() can fail and the only way to recover out of it is for the - * {@link SplitLogManager} to timeout the task node. - * @param slt - * @param ctr + * This function calculates how many splitters it could create based on expected average tasks per + * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
+ * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) + * @param numTasks current total number of available tasks + * @return */ - private void endTask(SplitLogTask slt, AtomicLong ctr) { - String path = currentTask; - currentTask = null; + private int calculateAvailableSplitters(int numTasks) { + // at lease one RS(itself) available + int availableRSs = 1; try { - if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), - currentVersion)) { - LOG.info("successfully transitioned task " + path + " to final state " + slt); - ctr.incrementAndGet(); - return; - } - LOG.warn("failed to transistion task " + path + " to end state " + slt + - " because of version mismatch "); - } catch (KeeperException.BadVersionException bve) { - LOG.warn("transisition task " + path + " to " + slt + - " failed because of version mismatch", bve); - } catch (KeeperException.NoNodeException e) { - LOG.fatal("logic error - end task " + path + " " + slt + - " failed because task doesn't exist", e); + List regionServers = ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode); + availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size()); } catch (KeeperException e) { - LOG.warn("failed to end task, " + path + " " + slt, e); + // do nothing + LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); } - SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet(); + + int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); + expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one + // calculate how many more splitters we could spawn + return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get(); + } + + /** + * Submit a log split task to executor service + * @param curTask + * @param curTaskZKVersion + */ + void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) { + final MutableInt zkVersion = new MutableInt(curTaskZKVersion); + + CancelableProgressable reporter = new CancelableProgressable() { + private long last_report_at = 0; + + @Override + public boolean progress() { + long t = EnvironmentEdgeManager.currentTimeMillis(); + if ((t - last_report_at) > reportPeriod) { + last_report_at = t; + int latestZKVersion = + attemptToOwnTask(false, watcher, serverName, curTask, zkVersion.intValue()); + if (latestZKVersion < 0) { + LOG.warn("Failed to heartbeat the task" + curTask); + return false; + } + zkVersion.setValue(latestZKVersion); + } + return true; + } + }; + + HLogSplitterHandler hsh = + new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress, + this.splitTaskExecutor); + this.executorService.submit(hsh); } void getDataSetWatchAsync() { Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java?rev=1545052&view=auto ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java (added) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java Sun Nov 24 19:21:54 2013 @@ -0,0 +1,141 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.handler; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Handles log splitting a wal + */ +@InterfaceAudience.Private +public class HLogSplitterHandler extends EventHandler { + private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class); + private final ServerName serverName; + private final String curTask; + private final String wal; + private final ZooKeeperWatcher zkw; + private final CancelableProgressable reporter; + private final AtomicInteger inProgressTasks; + private final MutableInt curTaskZKVersion; + private final TaskExecutor splitTaskExecutor; + + public HLogSplitterHandler(final Server server, String curTask, + final MutableInt curTaskZKVersion, + CancelableProgressable reporter, + AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) { + super(server, EventType.RS_LOG_REPLAY); + this.curTask = curTask; + this.wal = ZKSplitLog.getFileName(curTask); + this.reporter = reporter; + this.inProgressTasks = inProgressTasks; + this.inProgressTasks.incrementAndGet(); + this.serverName = server.getServerName(); + this.zkw = server.getZooKeeper(); + this.curTaskZKVersion = curTaskZKVersion; + this.splitTaskExecutor = splitTaskExecutor; + } + + @Override + public void process() throws IOException { + long startTime = System.currentTimeMillis(); + try { + Status status = this.splitTaskExecutor.exec(wal, reporter); + switch (status) { + case DONE: + endTask(zkw, new SplitLogTask.Done(this.serverName), + SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue()); + break; + case PREEMPTED: + SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); + LOG.warn("task execution prempted " + wal); + break; + case ERR: + if (server != null && !server.isStopped()) { + endTask(zkw, new SplitLogTask.Err(this.serverName), + SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue()); + break; + } + // if the RS is exiting then there is probably a tons of stuff + // that can go wrong. Resign instead of signaling error. + //$FALL-THROUGH$ + case RESIGNED: + if (server != null && server.isStopped()) { + LOG.info("task execution interrupted because worker is exiting " + curTask); + } + endTask(zkw, new SplitLogTask.Resigned(this.serverName), + SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue()); + break; + } + } finally { + LOG.info("worker " + serverName + " done with task " + curTask + " in " + + (System.currentTimeMillis() - startTime) + "ms"); + this.inProgressTasks.decrementAndGet(); + } + } + + /** + * endTask() can fail and the only way to recover out of it is for the + * {@link SplitLogManager} to timeout the task node. + * @param slt + * @param ctr + */ + public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task, + int taskZKVersion) { + try { + if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) { + LOG.info("successfully transitioned task " + task + " to final state " + slt); + ctr.incrementAndGet(); + return; + } + LOG.warn("failed to transistion task " + task + " to end state " + slt + + " because of version mismatch "); + } catch (KeeperException.BadVersionException bve) { + LOG.warn("transisition task " + task + " to " + slt + + " failed because of version mismatch", bve); + } catch (KeeperException.NoNodeException e) { + LOG.fatal( + "logic error - end task " + task + " " + slt + + " failed because task doesn't exist", e); + } catch (KeeperException e) { + LOG.warn("failed to end task, " + task + " " + slt, e); + } + SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet(); + } +} Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1545052&r1=1545051&r2=1545052&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Sun Nov 24 19:21:54 2013 @@ -798,7 +798,7 @@ public class HLogSplitter { private OutputSink outputSink = null; WriterThread(OutputSink sink, int i) { - super("WriterThread-" + i); + super(Thread.currentThread().getName() + "-Writer-" + i); outputSink = sink; } Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1545052&r1=1545051&r2=1545052&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Sun Nov 24 19:21:54 2013 @@ -155,6 +155,7 @@ public class TestDistributedLogSplitting conf.setInt("zookeeper.recovery.retry", 0); conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing + conf.setInt("hbase.regionserver.wal.max.splitters", 3); TEST_UTIL = new HBaseTestingUtility(conf); TEST_UTIL.setDFSCluster(dfsCluster); TEST_UTIL.setZkCluster(zkCluster); @@ -192,6 +193,7 @@ public class TestDistributedLogSplitting @Test (timeout=300000) public void testRecoveredEdits() throws Exception { LOG.info("testRecoveredEdits"); + conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); startCluster(NUM_RS); @@ -248,10 +250,12 @@ public class TestDistributedLogSplitting HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); LOG.debug("checking edits dir " + editsdir); FileStatus[] files = fs.listStatus(editsdir); - assertEquals(1, files.length); - int c = countHLog(files[0].getPath(), fs, conf); - count += c; - LOG.info(c + " edits in " + files[0].getPath()); + assertTrue(files.length > 1); + for (int i = 0; i < files.length; i++) { + int c = countHLog(files[i].getPath(), fs, conf); + count += c; + } + LOG.info(count + " edits in " + files.length + " recovered edits files."); } assertEquals(NUM_LOG_LINES, count); } @@ -259,6 +263,7 @@ public class TestDistributedLogSplitting @Test(timeout = 300000) public void testLogReplayWithNonMetaRSDown() throws Exception { LOG.info("testLogReplayWithNonMetaRSDown"); + conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); startCluster(NUM_RS); final int NUM_REGIONS_TO_CREATE = 40; Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java?rev=1545052&r1=1545051&r2=1545052&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java Sun Nov 24 19:21:54 2013 @@ -20,18 +20,25 @@ package org.apache.hadoop.hbase.regionse import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -56,6 +63,7 @@ public class TestSplitLogWorker { new HBaseTestingUtility(); private ZooKeeperWatcher zkw; private SplitLogWorker slw; + private ExecutorService executorService; private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems) throws Exception { @@ -69,14 +77,14 @@ public class TestSplitLogWorker { return waitForCounterBoolean(ctr, oldval, newval, timems, true); } - private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval, + private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval, long timems, boolean failIfTimeout) throws Exception { long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { - return (ctr.get() != oldval); + return (ctr.get() >= newval); } }); @@ -99,11 +107,18 @@ public class TestSplitLogWorker { ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode); assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); LOG.debug(zkw.splitLogZNode + " created"); + ZKUtil.createAndFailSilent(zkw, zkw.rsZNode); + assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1); SplitLogCounters.resetCounters(); + executorService = new ExecutorService("TestSplitLogWorker"); + executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); } @After public void teardown() throws Exception { + if (executorService != null) { + executorService.shutdown(); + } TEST_UTIL.shutdownMiniZKCluster(); } @@ -132,11 +147,13 @@ public class TestSplitLogWorker { SplitLogCounters.resetCounters(); final String TATAS = "tatas"; final ServerName RS = ServerName.valueOf("rs,1,1"); + RegionServerServices mockedRS = getRegionServer(RS); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), RS, neverEndingTask); + SplitLogWorker slw = + new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500); @@ -169,9 +186,12 @@ public class TestSplitLogWorker { zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR1, neverEndingTask); - SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR2, neverEndingTask); + RegionServerServices mockedRS1 = getRegionServer(SVR1); + RegionServerServices mockedRS2 = getRegionServer(SVR2); + SplitLogWorker slw1 = + new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); + SplitLogWorker slw2 = + new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); slw1.start(); slw2.start(); try { @@ -195,7 +215,9 @@ public class TestSplitLogWorker { SplitLogCounters.resetCounters(); final ServerName SRV = ServerName.valueOf("tpt_svr,1,1"); final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); - SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask); + RegionServerServices mockedRS = getRegionServer(SRV); + SplitLogWorker slw = + new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { Thread.yield(); // let the worker start @@ -226,7 +248,9 @@ public class TestSplitLogWorker { SplitLogCounters.resetCounters(); final ServerName SRV = ServerName.valueOf("tmt_svr,1,1"); final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); - SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask); + RegionServerServices mockedRS = getRegionServer(SRV); + SplitLogWorker slw = + new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { Thread.yield(); // let the worker start @@ -266,7 +290,8 @@ public class TestSplitLogWorker { LOG.info("testRescan"); SplitLogCounters.resetCounters(); final ServerName SRV = ServerName.valueOf("svr,1,1"); - slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask); + RegionServerServices mockedRS = getRegionServer(SRV); + slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); Thread.yield(); // let the worker start Thread.sleep(100); @@ -312,4 +337,100 @@ public class TestSplitLogWorker { assertEquals(2, num); } + @Test + public void testAcquireMultiTasks() throws Exception { + LOG.info("testAcquireMultiTasks"); + SplitLogCounters.resetCounters(); + final String TATAS = "tatas"; + final ServerName RS = ServerName.valueOf("rs,1,1"); + final int maxTasks = 3; + Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks); + RegionServerServices mockedRS = getRegionServer(RS); + + for (int i = 0; i < maxTasks; i++) { + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), + new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); + slw.start(); + try { + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, 3000); + for (int i = 0; i < maxTasks; i++) { + byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); + SplitLogTask slt = SplitLogTask.parseFrom(bytes); + assertTrue(slt.isOwned(RS)); + } + } finally { + stopSplitLogWorker(slw); + } + } + + /** + * The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per + * RS + * @throws Exception + */ + @Test + public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception { + LOG.info("testAcquireMultiTasks"); + SplitLogCounters.resetCounters(); + final String TATAS = "tatas"; + final ServerName RS = ServerName.valueOf("rs,1,1"); + final ServerName RS2 = ServerName.valueOf("rs,1,2"); + final int maxTasks = 3; + Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks); + RegionServerServices mockedRS = getRegionServer(RS); + + // create two RS nodes + String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName()); + zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName()); + zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + + for (int i = 0; i < maxTasks; i++) { + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), + new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + + SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); + slw.start(); + try { + int acquiredTasks = 0; + waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, 3000); + for (int i = 0; i < maxTasks; i++) { + byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); + SplitLogTask slt = SplitLogTask.parseFrom(bytes); + if (slt.isOwned(RS)) { + acquiredTasks++; + } + } + assertEquals(2, acquiredTasks); + } finally { + stopSplitLogWorker(slw); + } + } + + /** + * Create a mocked region server service instance + * @param server + * @return + */ + private RegionServerServices getRegionServer(ServerName name) { + + RegionServerServices mockedServer = mock(RegionServerServices.class); + when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); + when(mockedServer.getServerName()).thenReturn(name); + when(mockedServer.getZooKeeper()).thenReturn(zkw); + when(mockedServer.isStopped()).thenReturn(false); + when(mockedServer.getExecutorService()).thenReturn(executorService); + + return mockedServer; + } + }