Return-Path: Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: (qmail 23513 invoked from network); 18 Apr 2011 17:16:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 18 Apr 2011 17:16:47 -0000 Received: (qmail 52711 invoked by uid 500); 18 Apr 2011 17:16:47 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 52665 invoked by uid 500); 18 Apr 2011 17:16:47 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 52658 invoked by uid 99); 18 Apr 2011 17:16:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Apr 2011 17:16:47 +0000 X-ASF-Spam-Status: No, hits=-1996.9 required=5.0 tests=ALL_TRUSTED,RISK_FREE X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Apr 2011 17:16:38 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1EF4823888BD; Mon, 18 Apr 2011 17:16:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1094662 [1/3] - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/main/java/org/apache/hadoop/hbase/zookeeper/ ... Date: Mon, 18 Apr 2011 17:16:15 -0000 To: commits@hbase.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110418171616.1EF4823888BD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: stack Date: Mon Apr 18 17:16:15 2011 New Revision: 1094662 URL: http://svn.apache.org/viewvc?rev=1094662&view=rev Log: HBASE-1364 [performance] Distributed splitting of regionserver commit logs Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java Modified: hbase/trunk/CHANGES.txt hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Modified: hbase/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1094662&r1=1094661&r2=1094662&view=diff ============================================================================== --- hbase/trunk/CHANGES.txt (original) +++ hbase/trunk/CHANGES.txt Mon Apr 18 17:16:15 2011 @@ -200,6 +200,8 @@ Release 0.91.0 - Unreleased HBASE-3606 Create an package integration project (Eric Yang via Ryan) HBASE-3488 Add CellCounter to count multiple versions of rows (Subbu M. Iyer via Stack) + HBASE-1364 [performance] Distributed splitting of regionserver commit logs + (Prakash Khemani) Release 0.90.3 - Unreleased Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1094662&r1=1094661&r2=1094662&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Mon Apr 18 17:16:15 2011 @@ -301,6 +301,7 @@ implements HMasterInterface, HMasterRegi if (this.catalogTracker != null) this.catalogTracker.stop(); if (this.serverManager != null) this.serverManager.stop(); if (this.assignmentManager != null) this.assignmentManager.stop(); + if (this.fileSystemManager != null) this.fileSystemManager.stop(); HConnectionManager.deleteConnection(this.conf, true); this.zooKeeper.close(); } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1094662&r1=1094661&r2=1094662&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java Mon Apr 18 17:16:15 2011 @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionser import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; /** @@ -71,6 +72,8 @@ public class MasterFileSystem { private final Path rootdir; // create the split log lock final Lock splitLogLock = new ReentrantLock(); + final boolean distributedLogSplitting; + final SplitLogManager splitLogManager; public MasterFileSystem(Server master, MasterMetrics metrics) throws IOException { @@ -88,6 +91,15 @@ public class MasterFileSystem { String fsUri = this.fs.getUri().toString(); conf.set("fs.default.name", fsUri); conf.set("fs.defaultFS", fsUri); + this.distributedLogSplitting = + conf.getBoolean("hbase.master.distributed.log.splitting", true); + if (this.distributedLogSplitting) { + this.splitLogManager = new SplitLogManager(master.getZooKeeper(), + master.getConfiguration(), master, master.getServerName()); + this.splitLogManager.finishInitialization(); + } else { + this.splitLogManager = null; + } // setup the filesystem variable // set up the archived logs path this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME); @@ -198,28 +210,46 @@ public class MasterFileSystem { } public void splitLog(final String serverName) { - this.splitLogLock.lock(); long splitTime = 0, splitLogSize = 0; Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName)); - try { - HLogSplitter splitter = HLogSplitter.createLogSplitter( - conf, rootdir, logDir, oldLogDir, this.fs); + if (distributedLogSplitting) { + splitTime = EnvironmentEdgeManager.currentTimeMillis(); try { - splitter.splitLog(); - } catch (OrphanHLogAfterSplitException e) { - LOG.warn("Retrying splitting because of:", e); - // An HLogSplitter instance can only be used once. Get new instance. - splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, - oldLogDir, this.fs); - splitter.splitLog(); + try { + splitLogSize = splitLogManager.splitLogDistributed(logDir); + } catch (OrphanHLogAfterSplitException e) { + LOG.warn("Retrying distributed splitting for " + + serverName + "because of:", e); + splitLogManager.splitLogDistributed(logDir); + } + } catch (IOException e) { + LOG.error("Failed distributed splitting " + serverName, e); + } + splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; + } else { + // splitLogLock ensures that dead region servers' logs are processed + // one at a time + this.splitLogLock.lock(); + + try { + HLogSplitter splitter = HLogSplitter.createLogSplitter( + conf, rootdir, logDir, oldLogDir, this.fs); + try { + splitter.splitLog(); + } catch (OrphanHLogAfterSplitException e) { + LOG.warn("Retrying splitting because of:", e); + // An HLogSplitter instance can only be used once. Get new instance. + splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, + oldLogDir, this.fs); + splitter.splitLog(); + } + splitTime = splitter.getTime(); + splitLogSize = splitter.getSize(); + } catch (IOException e) { + LOG.error("Failed splitting " + logDir.toString(), e); + } finally { + this.splitLogLock.unlock(); } - splitTime = splitter.getTime(); - splitLogSize = splitter.getSize(); - } catch (IOException e) { - checkFileSystem(); - LOG.error("Failed splitting " + logDir.toString(), e); - } finally { - this.splitLogLock.unlock(); } if (this.metrics != null) { this.metrics.addSplit(splitTime, splitLogSize); @@ -335,4 +365,10 @@ public class MasterFileSystem { new Path(rootdir, region.getTableDesc().getNameAsString()), region.getEncodedName(), familyName), true); } + + public void stop() { + if (splitLogManager != null) { + this.splitLogManager.stop(); + } + } } Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1094662&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Mon Apr 18 17:16:15 2011 @@ -0,0 +1,933 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; +import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; + +/** + * Distributes the task of log splitting to the available region servers. + * Coordination happens via zookeeper. For every log file that has to be split a + * znode is created under /hbase/splitlog. SplitLogWorkers race to grab a task. + * + * SplitLogManager monitors the task znodes that it creates using the + * {@link #timeoutMonitor} thread. If a task's progress is slow then + * {@link #resubmit(String, boolean)} will take away the task from the owner + * {@link SplitLogWorker} and the task will be + * upforgrabs again. When the task is done then the task's znode is deleted by + * SplitLogManager. + * + * Clients call {@link #splitLogDistributed(Path)} to split a region server's + * log files. The caller thread waits in this method until all the log files + * have been split. + * + * All the zookeeper calls made by this class are asynchronous. This is mainly + * to help reduce response time seen by the callers. + * + * There is race in this design between the SplitLogManager and the + * SplitLogWorker. SplitLogManager might re-queue a task that has in reality + * already been completed by a SplitLogWorker. We rely on the idempotency of + * the log splitting task for correctness. + * + * It is also assumed that every log splitting task is unique and once + * completed (either with success or with error) it will be not be submitted + * again. If a task is resubmitted then there is a risk that old "delete task" + * can delete the re-submission. + */ +public class SplitLogManager extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(SplitLogManager.class); + + private final Stoppable stopper; + private final String serverName; + private final TaskFinisher taskFinisher; + private FileSystem fs; + private Configuration conf; + + private long zkretries; + private long resubmit_threshold; + private long timeout; + private long unassignedTimeout; + private long lastNodeCreateTime = Long.MAX_VALUE; + + private ConcurrentMap tasks = + new ConcurrentHashMap(); + private TimeoutMonitor timeoutMonitor; + + /** + * Its OK to construct this object even when region-servers are not online. It + * does lookup the orphan tasks in zk but it doesn't block for them to be + * done. + * + * @param zkw + * @param conf + * @param stopper + * @param serverName + * @param services + * @param service + */ + public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, + Stoppable stopper, String serverName) { + this(zkw, conf, stopper, serverName, new TaskFinisher() { + @Override + public Status finish(String workerName, String logfile) { + String tmpname = + ZKSplitLog.getSplitLogDirTmpComponent(workerName, logfile); + try { + HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf); + } catch (IOException e) { + LOG.warn("Could not finish splitting of log file " + logfile); + return Status.ERR; + } + return Status.DONE; + } + }); + } + public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, + Stoppable stopper, String serverName, TaskFinisher tf) { + super(zkw); + this.taskFinisher = tf; + this.conf = conf; + this.stopper = stopper; + this.zkretries = conf.getLong("hbase.splitlog.zk.retries", + ZKSplitLog.DEFAULT_ZK_RETRIES); + this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", + ZKSplitLog.DEFAULT_MAX_RESUBMIT); + this.timeout = conf.getInt("hbase.splitlog.manager.timeout", + ZKSplitLog.DEFAULT_TIMEOUT); + this.unassignedTimeout = + conf.getInt("hbase.splitlog.manager.unassigned.timeout", + ZKSplitLog.DEFAULT_UNASSIGNED_TIMEOUT); + LOG.debug("timeout = " + timeout); + LOG.debug("unassigned timeout = " + unassignedTimeout); + + this.serverName = serverName; + this.timeoutMonitor = new TimeoutMonitor( + conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", + 1000), + stopper); + } + + public void finishInitialization() { + Threads.setDaemonThreadRunning(timeoutMonitor, serverName + + ".splitLogManagerTimeoutMonitor"); + this.watcher.registerListener(this); + lookForOrphans(); + } + + /** + * The caller will block until all the log files of the given region server + * have been processed - successfully split or an error is encountered - by an + * available worker region server. This method must only be called after the + * region servers have been brought online. + * + * @param serverName + * region server name + * @throws IOException + * if there was an error while splitting any log file + * @return cumulative size of the logfiles split + */ + public long splitLogDistributed(final Path logDir) throws IOException { + this.fs = logDir.getFileSystem(conf); + if (!fs.exists(logDir)) { + LOG.warn(logDir + " doesn't exist. Nothing to do!"); + return 0; + } + FileStatus[] logfiles = fs.listStatus(logDir); // TODO filter filenames? + if (logfiles == null || logfiles.length == 0) { + LOG.info(logDir + " is empty dir, no logs to split"); + return 0; + } + tot_mgr_log_split_batch_start.incrementAndGet(); + LOG.info("started splitting logs in " + logDir); + long t = EnvironmentEdgeManager.currentTimeMillis(); + long totalSize = 0; + TaskBatch batch = new TaskBatch(); + for (FileStatus lf : logfiles) { + // TODO If the log file is still being written to - which is most likely + // the case for the last log file - then its length will show up here + // as zero. The size of such a file can only be retrieved after after + // recover-lease is done. totalSize will be under in most cases and the + // metrics that it drives will also be under-reported. + totalSize += lf.getLen(); + if (installTask(lf.getPath().toString(), batch) == false) { + throw new IOException("duplicate log split scheduled for " + + lf.getPath()); + } + } + waitTasks(batch); + if (batch.done != batch.installed) { + stopTrackingTasks(batch); + tot_mgr_log_split_batch_err.incrementAndGet(); + LOG.warn("error while splitting logs in " + logDir + + " installed = " + batch.installed + " but only " + batch.done + " done"); + throw new IOException("error or interrupt while splitting logs in " + + logDir + " Task = " + batch); + } + if (anyNewLogFiles(logDir, logfiles)) { + tot_mgr_new_unexpected_hlogs.incrementAndGet(); + LOG.warn("new hlogs were produced while logs in " + logDir + + " were being split"); + throw new OrphanHLogAfterSplitException(); + } + tot_mgr_log_split_batch_success.incrementAndGet(); + if (!fs.delete(logDir, true)) { + throw new IOException("Unable to delete src dir: " + logDir); + } + LOG.info("finished splitting (more than or equal to) " + totalSize + + " bytes in " + batch.installed + " log files in " + logDir + " in " + + (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"); + return totalSize; + } + + boolean installTask(String taskname, TaskBatch batch) { + tot_mgr_log_split_start.incrementAndGet(); + String path = ZKSplitLog.getNodeName(watcher, taskname); + Task oldtask = createTaskIfAbsent(path, batch); + if (oldtask == null) { + // publish the task in zk + createNode(path, zkretries); + return true; + } + LOG.warn(path + "is already being split. " + + "Two threads cannot wait for the same task"); + return false; + } + + private void waitTasks(TaskBatch batch) { + synchronized (batch) { + while ((batch.done + batch.error) != batch.installed) { + try { + batch.wait(100); + if (stopper.isStopped()) { + LOG.warn("Stopped while waiting for log splits to be completed"); + return; + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for log splits to be completed"); + Thread.currentThread().interrupt(); + return; + } + } + } + } + + private void setDone(String path, boolean err) { + if (!ZKSplitLog.isRescanNode(watcher, path)) { + if (!err) { + tot_mgr_log_split_success.incrementAndGet(); + LOG.info("Done splitting " + path); + } else { + tot_mgr_log_split_err.incrementAndGet(); + LOG.warn("Error splitting " + path); + } + } + Task task = tasks.get(path); + if (task == null) { + if (!ZKSplitLog.isRescanNode(watcher, path)) { + tot_mgr_unacquired_orphan_done.incrementAndGet(); + LOG.debug("unacquired orphan task is done " + path); + } + } else { + // if in stopTrackingTasks() we were to make tasks orphan instead of + // forgetting about them then we will have to handle the race when + // accessing task.batch here. + if (!task.isOrphan()) { + synchronized (task.batch) { + if (!err) { + task.batch.done++; + } else { + task.batch.error++; + } + if ((task.batch.done + task.batch.error) == task.batch.installed) { + task.batch.notify(); + } + } + } + task.deleted = true; + } + // delete the task node in zk. Keep trying indefinitely - its an async + // call and no one is blocked waiting for this node to be deleted. All + // task names are unique (log.) there is no risk of deleting + // a future task. + deleteNode(path, Long.MAX_VALUE); + return; + } + + private void createNode(String path, Long retry_count) { + ZKUtil.asyncCreate(this.watcher, path, + TaskState.TASK_UNASSIGNED.get(serverName), new CreateAsyncCallback(), + retry_count); + tot_mgr_node_create_queued.incrementAndGet(); + return; + } + + private void createNodeSuccess(String path) { + lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis(); + LOG.debug("put up splitlog task at znode " + path); + getDataSetWatch(path, zkretries); + } + + private void createNodeFailure(String path) { + // TODO the Manger should split the log locally instead of giving up + LOG.warn("failed to create task node" + path); + setDone(path, true); + } + + + private void getDataSetWatch(String path, Long retry_count) { + this.watcher.getZooKeeper().getData(path, this.watcher, + new GetDataAsyncCallback(), retry_count); + tot_mgr_get_data_queued.incrementAndGet(); + } + + private void getDataSetWatchSuccess(String path, byte[] data, int version) { + if (data == null) { + tot_mgr_null_data.incrementAndGet(); + LOG.fatal("logic error - got null data " + path); + setDone(path, true); + return; + } + // LOG.debug("set watch on " + path + " got data " + new String(data)); + if (TaskState.TASK_UNASSIGNED.equals(data)) { + LOG.debug("task not yet acquired " + path + " ver = " + version); + handleUnassignedTask(path); + } else if (TaskState.TASK_OWNED.equals(data)) { + registerHeartbeat(path, version, + TaskState.TASK_OWNED.getWriterName(data)); + } else if (TaskState.TASK_RESIGNED.equals(data)) { + LOG.info("task " + path + " entered state " + new String(data)); + resubmit(path, true); + } else if (TaskState.TASK_DONE.equals(data)) { + LOG.info("task " + path + " entered state " + new String(data)); + if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { + if (taskFinisher.finish(TaskState.TASK_DONE.getWriterName(data), + ZKSplitLog.getFileName(path)) == Status.DONE) { + setDone(path, false); // success + } else { + resubmit(path, false); // err + } + } else { + setDone(path, false); // success + } + } else if (TaskState.TASK_ERR.equals(data)) { + LOG.info("task " + path + " entered state " + new String(data)); + resubmit(path, false); + } else { + LOG.fatal("logic error - unexpected zk state for path = " + path + + " data = " + new String(data)); + setDone(path, true); + } + } + + private void getDataSetWatchFailure(String path) { + LOG.warn("failed to set data watch " + path); + setDone(path, true); + } + + /** + * It is possible for a task to stay in UNASSIGNED state indefinitely - say + * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED + * state but it dies before it could create the RESCAN task node to signal + * the SplitLogWorkers to pick up the task. To prevent this scenario the + * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. + * + * @param path + */ + private void handleUnassignedTask(String path) { + if (ZKSplitLog.isRescanNode(watcher, path)) { + return; + } + Task task = findOrCreateOrphanTask(path); + if (task.isOrphan() && (task.incarnation == 0)) { + LOG.info("resubmitting unassigned orphan task " + path); + // ignore failure to resubmit. The timeout-monitor will handle it later + // albeit in a more crude fashion + resubmit(path, task, true); + } + } + + private void registerHeartbeat(String path, int new_version, + String workerName) { + Task task = findOrCreateOrphanTask(path); + if (new_version != task.last_version) { + if (task.isUnassigned()) { + LOG.info("task " + path + " acquired by " + workerName); + } + // very noisy + //LOG.debug("heartbeat for " + path + " last_version=" + task.last_version + + // " last_update=" + task.last_update + " new_version=" + + // new_version); + task.last_update = EnvironmentEdgeManager.currentTimeMillis(); + task.last_version = new_version; + tot_mgr_heartbeat.incrementAndGet(); + } else { + assert false; + LOG.warn("got dup heartbeat for " + path + " ver = " + new_version); + } + return; + } + + private boolean resubmit(String path, Task task, boolean force) { + // its ok if this thread misses the update to task.deleted. It will + // fail later + if (task.deleted) { + return false; + } + int version; + if (!force) { + if ((EnvironmentEdgeManager.currentTimeMillis() - task.last_update) < + timeout) { + return false; + } + if (task.unforcedResubmits >= resubmit_threshold) { + if (task.unforcedResubmits == resubmit_threshold) { + tot_mgr_resubmit_threshold_reached.incrementAndGet(); + LOG.info("Skipping resubmissions of task " + path + + " because threshold " + resubmit_threshold + " reached"); + } + return false; + } + // race with registerHeartBeat that might be changing last_version + version = task.last_version; + } else { + version = -1; + } + LOG.info("resubmitting task " + path); + task.incarnation++; + try { + // blocking zk call but this is done from the timeout thread + if (ZKUtil.setData(this.watcher, path, + TaskState.TASK_UNASSIGNED.get(serverName), + version) == false) { + LOG.debug("failed to resubmit task " + path + + " version changed"); + return false; + } + } catch (NoNodeException e) { + LOG.debug("failed to resubmit " + path + " task done"); + return false; + } catch (KeeperException e) { + tot_mgr_resubmit_failed.incrementAndGet(); + LOG.warn("failed to resubmit " + path, e); + return false; + } + // don't count forced resubmits + if (!force) { + task.unforcedResubmits++; + } + task.setUnassigned(); + createRescanNode(Long.MAX_VALUE); + tot_mgr_resubmit.incrementAndGet(); + return true; + } + + private void resubmit(String path, boolean force) { + if (resubmit(path, findOrCreateOrphanTask(path), force) == false) { + setDone(path, true); // error + } + } + + private void deleteNode(String path, Long retries) { + tot_mgr_node_delete_queued.incrementAndGet(); + this.watcher.getZooKeeper().delete(path, -1, new DeleteAsyncCallback(), + retries); + } + + private void deleteNodeSuccess(String path) { + Task task; + task = tasks.remove(path); + if (task == null) { + if (ZKSplitLog.isRescanNode(watcher, path)) { + tot_mgr_rescan_deleted.incrementAndGet(); + } + tot_mgr_missing_state_in_delete.incrementAndGet(); + LOG.debug("deleted task without in memory state " + path); + return; + } + tot_mgr_task_deleted.incrementAndGet(); + } + + private void deleteNodeFailure(String path) { + LOG.fatal("logic failure, failing to delete a node should never happen " + + "because delete has infinite retries"); + return; + } + + /** + * signal the workers that a task was resubmitted by creating the + * RESCAN node. + */ + private void createRescanNode(long retries) { + watcher.getZooKeeper().create(ZKSplitLog.getRescanNode(watcher), + TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL, + new CreateRescanAsyncCallback(), new Long(retries)); + } + + private void createRescanSuccess(String path) { + tot_mgr_rescan.incrementAndGet(); + getDataSetWatch(path, zkretries); + } + + private void createRescanFailure() { + LOG.fatal("logic failure, rescan failure must not happen"); + } + + /** + * @param path + * @param batch + * @return null on success, existing task on error + */ + private Task createTaskIfAbsent(String path, TaskBatch batch) { + Task oldtask; + oldtask = tasks.putIfAbsent(path, new Task(batch)); + if (oldtask != null && oldtask.isOrphan()) { + LOG.info("Previously orphan task " + path + + " is now being waited upon"); + oldtask.setBatch(batch); + return (null); + } + return oldtask; + } + + /** + * This function removes any knowledge of this batch's tasks from the + * manager. It doesn't actually stop the active tasks. If the tasks are + * resubmitted then the active tasks will be reacquired and monitored by the + * manager. It is important to call this function when batch processing + * terminates prematurely, otherwise if the tasks are re-submitted + * then they might fail. + *

+ * there is a slight race here. even after a task has been removed from + * {@link #tasks} someone who had acquired a reference to it will continue to + * process the task. That is OK since we don't actually change the task and + * the batch objects. + *

+ * TODO Its probably better to convert these to orphan tasks but then we + * have to deal with race conditions as we nullify Task's batch pointer etc. + *

+ * @param batch + */ + void stopTrackingTasks(TaskBatch batch) { + for (Map.Entry e : tasks.entrySet()) { + String path = e.getKey(); + Task t = e.getValue(); + if (t.batch == batch) { // == is correct. equals not necessary. + tasks.remove(path); + } + } + } + + Task findOrCreateOrphanTask(String path) { + Task orphanTask = new Task(null); + Task task; + task = tasks.putIfAbsent(path, orphanTask); + if (task == null) { + LOG.info("creating orphan task " + path); + tot_mgr_orphan_task_acquired.incrementAndGet(); + task = orphanTask; + } + return task; + } + + @Override + public void nodeDataChanged(String path) { + if (tasks.get(path) != null || ZKSplitLog.isRescanNode(watcher, path)) { + getDataSetWatch(path, zkretries); + } + } + + public void stop() { + if (timeoutMonitor != null) { + timeoutMonitor.interrupt(); + } + } + + private void lookForOrphans() { + List orphans; + try { + orphans = ZKUtil.listChildrenNoWatch(this.watcher, + this.watcher.splitLogZNode); + if (orphans == null) { + LOG.warn("could not get children of " + this.watcher.splitLogZNode); + return; + } + } catch (KeeperException e) { + LOG.warn("could not get children of " + this.watcher.splitLogZNode + + " " + StringUtils.stringifyException(e)); + return; + } + LOG.info("found " + orphans.size() + " orphan tasks"); + for (String path : orphans) { + LOG.info("found orphan task " + path); + getDataSetWatch(ZKSplitLog.getNodeName(watcher, path), zkretries); + } + } + + /** + * Keeps track of the batch of tasks submitted together by a caller in + * splitLogDistributed(). Clients threads use this object to wait for all + * their tasks to be done. + *

+ * All access is synchronized. + */ + static class TaskBatch { + int installed; + int done; + int error; + + @Override + public String toString() { + return ("installed = " + installed + " done = " + done + " error = " + + error); + } + } + + /** + * in memory state of an active task. + */ + static class Task { + long last_update; + int last_version; + TaskBatch batch; + boolean deleted; + int incarnation; + int unforcedResubmits; + + @Override + public String toString() { + return ("last_update = " + last_update + + " last_version = " + last_version + + " deleted = " + deleted + + " incarnation = " + incarnation + + " resubmits = " + unforcedResubmits + + " batch = " + batch); + } + + Task(TaskBatch tb) { + incarnation = 0; + last_version = -1; + deleted = false; + setBatch(tb); + setUnassigned(); + } + + public void setBatch(TaskBatch batch) { + if (batch != null && this.batch != null) { + LOG.fatal("logic error - batch being overwritten"); + } + this.batch = batch; + if (batch != null) { + batch.installed++; + } + } + + public boolean isOrphan() { + return (batch == null); + } + + public boolean isUnassigned() { + return (last_update == -1); + } + + public void setUnassigned() { + last_update = -1; + } + } + + /** + * Periodically checks all active tasks and resubmits the ones that have timed + * out + */ + private class TimeoutMonitor extends Chore { + public TimeoutMonitor(final int period, Stoppable stopper) { + super("SplitLogManager Timeout Monitor", period, stopper); + } + + @Override + protected void chore() { + int resubmitted = 0; + int unassigned = 0; + int tot = 0; + boolean found_assigned_task = false; + + for (Map.Entry e : tasks.entrySet()) { + String path = e.getKey(); + Task task = e.getValue(); + tot++; + // don't easily resubmit a task which hasn't been picked up yet. It + // might be a long while before a SplitLogWorker is free to pick up a + // task. This is because a SplitLogWorker picks up a task one at a + // time. If we want progress when there are no region servers then we + // will have to run a SplitLogWorker thread in the Master. + if (task.isUnassigned()) { + unassigned++; + continue; + } + found_assigned_task = true; + if (resubmit(path, task, false)) { + resubmitted++; + } + } + if (tot > 0) { + LOG.debug("total tasks = " + tot + " unassigned = " + unassigned); + } + if (resubmitted > 0) { + LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks"); + } + // If there are pending tasks and all of them have been unassigned for + // some time then put up a RESCAN node to ping the workers. + // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes + // because a. it is very unlikely that every worker had a + // transient error when trying to grab the task b. if there are no + // workers then all tasks wills stay unassigned indefinitely and the + // manager will be indefinitely creating RESCAN nodes. TODO may be the + // master should spawn both a manager and a worker thread to guarantee + // that there is always one worker in the system + if (tot > 0 && !found_assigned_task && + ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) > + unassignedTimeout)) { + createRescanNode(Long.MAX_VALUE); + tot_mgr_resubmit_unassigned.incrementAndGet(); + LOG.debug("resubmitting unassigned task(s) after timeout"); + } + } + } + + /** + * Asynchronous handler for zk create node results. + * Retries on failures. + */ + class CreateAsyncCallback implements AsyncCallback.StringCallback { + private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + tot_mgr_node_create_result.incrementAndGet(); + if (rc != 0) { + if (rc == KeeperException.Code.NODEEXISTS.intValue()) { + LOG.debug("found pre-existing znode " + path); + tot_mgr_node_already_exists.incrementAndGet(); + } else { + LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + + path); + Long retry_count = (Long)ctx; + if (retry_count == 0) { + tot_mgr_node_create_err.incrementAndGet(); + createNodeFailure(path); + } else { + tot_mgr_node_create_retry.incrementAndGet(); + createNode(path, retry_count - 1); + } + return; + } + } + createNodeSuccess(path); + } + } + + /** + * Asynchronous handler for zk get-data-set-watch on node results. + * Retries on failures. + */ + class GetDataAsyncCallback implements AsyncCallback.DataCallback { + private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, + Stat stat) { + tot_mgr_get_data_result.incrementAndGet(); + if (rc != 0) { + LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " "+ path); + Long retry_count = (Long) ctx; + if (retry_count == 0) { + tot_mgr_get_data_err.incrementAndGet(); + getDataSetWatchFailure(path); + } else { + tot_mgr_get_data_retry.incrementAndGet(); + getDataSetWatch(path, retry_count - 1); + } + return; + } + getDataSetWatchSuccess(path, data, stat.getVersion()); + return; + } + } + + /** + * Asynchronous handler for zk delete node results. + * Retries on failures. + */ + class DeleteAsyncCallback implements AsyncCallback.VoidCallback { + private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx) { + tot_mgr_node_delete_result.incrementAndGet(); + if (rc != 0) { + if (rc != KeeperException.Code.NONODE.intValue()) { + tot_mgr_node_delete_err.incrementAndGet(); + LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path); + Long retry_count = (Long) ctx; + if (retry_count == 0) { + LOG.warn("delete failed " + path); + deleteNodeFailure(path); + } else { + deleteNode(path, retry_count - 1); + } + return; + } else { + LOG.debug(path + + " does not exist, either was never created or was deleted" + + " in earlier rounds, zkretries = " + (Long) ctx); + } + } else { + LOG.debug("deleted " + path); + } + deleteNodeSuccess(path); + } + } + + /** + * Asynchronous handler for zk create RESCAN-node results. + * Retries on failures. + *

+ * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal + * for all the {@link SplitLogWorker}s to rescan for new tasks. + */ + class CreateRescanAsyncCallback implements AsyncCallback.StringCallback { + private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + if (rc != 0) { + LOG.warn("rc =" + KeeperException.Code.get(rc) + " for "+ path); + Long retry_count = (Long)ctx; + if (retry_count == 0) { + createRescanFailure(); + } else { + createRescanNode(retry_count - 1); + } + return; + } + // path is the original arg, name is the actual name that was created + createRescanSuccess(name); + } + } + + /** + * checks whether any new files have appeared in logDir which were + * not present in the original logfiles set + * @param logdir + * @param logfiles + * @return True if a new log file is found + * @throws IOException + */ + public boolean anyNewLogFiles(Path logdir, FileStatus[] logfiles) + throws IOException { + if (logdir == null) { + return false; + } + LOG.debug("re-listing " + logdir); + tot_mgr_relist_logdir.incrementAndGet(); + FileStatus[] newfiles = fs.listStatus(logdir); + if (newfiles == null) { + return false; + } + boolean matched; + for (FileStatus newfile : newfiles) { + matched = false; + for (FileStatus origfile : logfiles) { + if (origfile.equals(newfile)) { + matched = true; + break; + } + } + if (matched == false) { + LOG.warn("Discovered orphan hlog " + newfile + " after split." + + " Maybe HRegionServer was not dead when we started"); + return true; + } + } + return false; + } + + /** + * {@link SplitLogManager} can use objects implementing this interface to + * finish off a partially done task by {@link SplitLogWorker}. This provides + * a serialization point at the end of the task processing. + */ + static public interface TaskFinisher { + /** + * status that can be returned finish() + */ + static public enum Status { + /** + * task completed successfully + */ + DONE(), + /** + * task completed with error + */ + ERR(); + } + /** + * finish the partially done task. workername provides clue to where the + * partial results of the partially done tasks are present. taskname is the + * name of the task that was put up in zookeeper. + *

+ * @param workerName + * @param taskname + * @return DONE if task completed successfully, ERR otherwise + */ + public Status finish(String workerName, String taskname); + } +} \ No newline at end of file Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1094662&r1=1094661&r2=1094662&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Apr 18 17:16:15 2011 @@ -258,6 +258,9 @@ public class HRegionServer implements HR // Cluster Status Tracker private ClusterStatusTracker clusterStatusTracker; + // Log Splitting Worker + private SplitLogWorker splitLogWorker; + // A sleeper that sleeps for msgInterval. private final Sleeper sleeper; @@ -377,7 +380,7 @@ public class HRegionServer implements HR qosMap.put(m.getName(), p.priority()); } } - + annotatedQos = qosMap; } @@ -397,7 +400,7 @@ public class HRegionServer implements HR Invocation inv = (Invocation) from; String methodName = inv.getMethodName(); - + Integer priorityByAnnotation = annotatedQos.get(methodName); if (priorityByAnnotation != null) { return priorityByAnnotation; @@ -510,6 +513,11 @@ public class HRegionServer implements HR this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); + + // Create the log splitting worker and start it + this.splitLogWorker = new SplitLogWorker(this.zooKeeper, + this.getConfiguration(), this.getServerName()); + splitLogWorker.start(); } /** @@ -636,6 +644,9 @@ public class HRegionServer implements HR } this.leases.closeAfterLeasesExpire(); this.server.stop(); + if (this.splitLogWorker != null) { + splitLogWorker.stop(); + } if (this.infoServer != null) { LOG.info("Stopping infoServer"); try { @@ -2816,6 +2827,11 @@ public class HRegionServer implements HR return this.compactSplitThread; } + public ZooKeeperWatcher getZooKeeperWatcher() { + return this.zooKeeper; + } + + // // Main program and support routines // Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1094662&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Mon Apr 18 17:16:15 2011 @@ -0,0 +1,549 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +/** + * This worker is spawned in every regionserver (should we also spawn one in + * the master?). The Worker waits for log splitting tasks to be put up by the + * {@link SplitLogManager} running in the master and races with other workers + * in other serves to acquire those tasks. The coordination is done via + * zookeeper. All the action takes place at /hbase/splitlog znode. + *

+ * If a worker has successfully moved the task from state UNASSIGNED to + * OWNED then it owns the task. It keeps heart beating the manager by + * periodically moving the task from OWNED to OWNED state. On success it + * moves the task to SUCCESS. On unrecoverable error it moves task state to + * ERR. If it cannot continue but wants the master to retry the task then it + * moves the task state to RESIGNED. + *

+ * The manager can take a task away from a worker by moving the task from + * OWNED to UNASSIGNED. In the absence of a global lock there is a + * unavoidable race here - a worker might have just finished its task when it + * is stripped of its ownership. Here we rely on the idempotency of the log + * splitting task for correctness + */ +public class SplitLogWorker extends ZooKeeperListener implements Runnable { + private static final Log LOG = LogFactory.getLog(SplitLogWorker.class); + + Thread worker; + private final String serverName; + private final TaskExecutor executor; + private long zkretries; + + private Object taskReadyLock = new Object(); + volatile int taskReadySeq = 0; + private volatile String currentTask = null; + private int currentVersion; + private volatile boolean exitWorker; + private Object grabTaskLock = new Object(); + private boolean workerInGrabTask = false; + + + public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, + String serverName, TaskExecutor executor) { + super(watcher); + this.serverName = serverName; + this.executor = executor; + this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 3); + } + + public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf, + final String serverName) { + this(watcher, conf, serverName, new TaskExecutor () { + @Override + public Status exec(String filename, CancelableProgressable p) { + Path rootdir; + FileSystem fs; + try { + rootdir = FSUtils.getRootDir(conf); + fs = rootdir.getFileSystem(conf); + } catch (IOException e) { + LOG.warn("could not find root dir or fs", e); + return Status.RESIGNED; + } + // TODO have to correctly figure out when log splitting has been + // interrupted or has encountered a transient error and when it has + // encountered a bad non-retry-able persistent error. + try { + String tmpname = + ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename); + if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname, + fs.getFileStatus(new Path(filename)), fs, conf, p) == false) { + return Status.PREEMPTED; + } + } catch (InterruptedIOException iioe) { + LOG.warn("log splitting of " + filename + " interrupted, resigning", + iioe); + return Status.RESIGNED; + } catch (IOException e) { + Throwable cause = e.getCause(); + if (cause instanceof InterruptedException) { + LOG.warn("log splitting of " + filename + " interrupted, resigning", + e); + return Status.RESIGNED; + } + LOG.warn("log splitting of " + filename + " failed, returning error", + e); + return Status.ERR; + } + return Status.DONE; + } + }); + } + + @Override + public void run() { + LOG.info("SplitLogWorker starting"); + this.watcher.registerListener(this); + int res; + // wait for master to create the splitLogZnode + res = -1; + while (res == -1) { + try { + res = ZKUtil.checkExists(watcher, watcher.splitLogZNode); + } catch (KeeperException e) { + // ignore + LOG.warn("Exception when checking for " + watcher.splitLogZNode + + " ... retrying", e); + } + if (res == -1) { + try { + LOG.info(watcher.splitLogZNode + " znode does not exist," + + " waiting for master to create one"); + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode); + assert exitWorker == true; + } + } + } + + taskLoop(); + + LOG.info("SplitLogWorker exiting"); + } + + /** + * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task + * one at a time. This policy puts an upper-limit on the number of + * simultaneous log splitting that could be happening in a cluster. + *

+ * Synchronization using {@link #task_ready_signal_seq} ensures that it will + * try to grab every task that has been put up + */ + private void taskLoop() { + while (true) { + int seq_start = taskReadySeq; + List paths = getTaskList(); + if (paths == null) { + LOG.warn("Could not get tasks, did someone remove " + + this.watcher.splitLogZNode + " ... worker thread exiting."); + return; + } + int offset = (int)(Math.random() * paths.size()); + for (int i = 0; i < paths.size(); i ++) { + int idx = (i + offset) % paths.size(); + // don't call ZKSplitLog.getNodeName() because that will lead to + // double encoding of the path name + grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); + if (exitWorker == true) { + return; + } + } + synchronized (taskReadyLock) { + while (seq_start == taskReadySeq) { + try { + taskReadyLock.wait(); + } catch (InterruptedException e) { + LOG.warn("SplitLogWorker inteurrpted while waiting for task," + + " exiting", e); + assert exitWorker == true; + return; + } + } + } + } + } + + /** + * try to grab a 'lock' on the task zk node to own and execute the task. + *

+ * @param path zk node for the task + */ + private void grabTask(String path) { + Stat stat = new Stat(); + long t = -1; + byte[] data; + synchronized (grabTaskLock) { + currentTask = path; + workerInGrabTask = true; + if (Thread.interrupted()) { + return; + } + } + try { + try { + if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) { + tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); + return; + } + } catch (KeeperException e) { + LOG.warn("Failed to get data for znode " + path, e); + tot_wkr_failed_to_grab_task_exception.incrementAndGet(); + return; + } + if (TaskState.TASK_UNASSIGNED.equals(data) == false) { + tot_wkr_failed_to_grab_task_owned.incrementAndGet(); + return; + } + + currentVersion = stat.getVersion(); + if (ownTask() == false) { + tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); + return; + } + + if (ZKSplitLog.isRescanNode(watcher, currentTask)) { + endTask(TaskState.TASK_DONE, tot_wkr_task_acquired_rescan); + return; + } + LOG.info("worker " + serverName + " acquired task " + path); + tot_wkr_task_acquired.incrementAndGet(); + getDataSetWatchAsync(); + + t = System.currentTimeMillis(); + TaskExecutor.Status status; + + status = executor.exec(ZKSplitLog.getFileName(currentTask), + new CancelableProgressable() { + + @Override + public boolean progress() { + if (ownTask() == false) { + LOG.warn("Failed to heartbeat the task" + currentTask); + return false; + } + return true; + } + }); + switch (status) { + case DONE: + endTask(TaskState.TASK_DONE, tot_wkr_task_done); + break; + case PREEMPTED: + tot_wkr_preempt_task.incrementAndGet(); + LOG.warn("task execution prempted " + path); + break; + case ERR: + if (!exitWorker) { + endTask(TaskState.TASK_ERR, tot_wkr_task_err); + break; + } + // if the RS is exiting then there is probably a tons of stuff + // that can go wrong. Resign instead of signaling error. + //$FALL-THROUGH$ + case RESIGNED: + if (exitWorker) { + LOG.info("task execution interrupted because worker is exiting " + + path); + endTask(TaskState.TASK_RESIGNED, tot_wkr_task_resigned); + } else { + tot_wkr_preempt_task.incrementAndGet(); + LOG.info("task execution interrupted via zk by manager " + + path); + } + break; + } + } finally { + if (t > 0) { + LOG.info("worker " + serverName + " done with task " + path + + " in " + (System.currentTimeMillis() - t) + "ms"); + } + synchronized (grabTaskLock) { + workerInGrabTask = false; + // clear the interrupt from stopTask() otherwise the next task will + // suffer + Thread.interrupted(); + } + } + return; + } + + /** + * Try to own the task by transitioning the zk node data from UNASSIGNED to + * OWNED. + *

+ * This method is also used to periodically heartbeat the task progress by + * transitioning the node from OWNED to OWNED. + *

+ * @return true if task path is successfully locked + */ + private boolean ownTask() { + try { + Stat stat = this.watcher.getZooKeeper().setData(currentTask, + TaskState.TASK_OWNED.get(serverName), currentVersion); + if (stat == null) { + return (false); + } + currentVersion = stat.getVersion(); + if (LOG.isDebugEnabled()) { + LOG.debug ("hearbeat for path " + currentTask + + " successful, version = " + currentVersion); + } + tot_wkr_task_heartbeat.incrementAndGet(); + return (true); + } catch (KeeperException e) { + // either Bad Version or Node has been removed + LOG.warn("failed to assert ownership for " + currentTask, e); + } catch (InterruptedException e1) { + LOG.warn("Interrupted while trying to assert ownership of " + + currentTask + " " + StringUtils.stringifyException(e1)); + Thread.currentThread().interrupt(); + } + tot_wkr_task_heartbeat_failed.incrementAndGet(); + return (false); + } + + /** + * endTask() can fail and the only way to recover out of it is for the + * {@link SplitLogManager} to timeout the task node. + * @param ts + * @param ctr + */ + private void endTask(ZKSplitLog.TaskState ts, AtomicLong ctr) { + String path = currentTask; + currentTask = null; + try { + if (ZKUtil.setData(this.watcher, path, ts.get(serverName), + currentVersion)) { + LOG.info("successfully transitioned task " + path + + " to final state " + ts); + ctr.incrementAndGet(); + return; + } + LOG.warn("failed to transistion task " + path + " to end state " + ts + + " because of version mismatch "); + } catch (KeeperException.BadVersionException bve) { + LOG.warn("transisition task " + path + " to " + ts + + " failed because of version mismatch", bve); + } catch (KeeperException.NoNodeException e) { + LOG.fatal("logic error - end task " + path + " " + ts + + " failed because task doesn't exist", e); + } catch (KeeperException e) { + LOG.warn("failed to end task, " + path + " " + ts, e); + } + tot_wkr_final_transistion_failed.incrementAndGet(); + return; + } + + void getDataSetWatchAsync() { + this.watcher.getZooKeeper().getData(currentTask, this.watcher, + new GetDataAsyncCallback(), null); + tot_wkr_get_data_queued.incrementAndGet(); + } + + void getDataSetWatchSuccess(String path, byte[] data) { + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change but that's ok + String taskpath = currentTask; + if (taskpath != null && taskpath.equals(path)) { + // have to compare data. cannot compare version because then there + // will be race with ownTask() + // cannot just check whether the node has been transitioned to + // UNASSIGNED because by the time this worker sets the data watch + // the node might have made two transitions - from owned by this + // worker to unassigned to owned by another worker + if (! TaskState.TASK_OWNED.equals(data, serverName) && + ! TaskState.TASK_DONE.equals(data, serverName) && + ! TaskState.TASK_ERR.equals(data, serverName) && + ! TaskState.TASK_RESIGNED.equals(data, serverName)) { + LOG.info("task " + taskpath + " preempted from server " + + serverName + " ... current task state and owner - " + + new String(data)); + stopTask(); + } + } + } + } + } + + void getDataSetWatchFailure(String path) { + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change but that's ok + String taskpath = currentTask; + if (taskpath != null && taskpath.equals(path)) { + LOG.info("retrying data watch on " + path); + tot_wkr_get_data_retry.incrementAndGet(); + getDataSetWatchAsync(); + } else { + // no point setting a watch on the task which this worker is not + // working upon anymore + } + } + } + } + + + + + @Override + public void nodeDataChanged(String path) { + // there will be a self generated dataChanged event every time ownTask() + // heartbeats the task znode by upping its version + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change + String taskpath = currentTask; + if (taskpath!= null && taskpath.equals(path)) { + getDataSetWatchAsync(); + } + } + } + } + + + private List getTaskList() { + for (int i = 0; i < zkretries; i++) { + try { + return (ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, + this.watcher.splitLogZNode)); + } catch (KeeperException e) { + LOG.warn("Could not get children of znode " + + this.watcher.splitLogZNode, e); + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + LOG.warn("Interrupted while trying to get task list ...", e1); + Thread.currentThread().interrupt(); + return null; + } + } + } + LOG.warn("Tried " + zkretries + " times, still couldn't fetch " + + "children of " + watcher.splitLogZNode + " giving up"); + return null; + } + + + @Override + public void nodeChildrenChanged(String path) { + if(path.equals(watcher.splitLogZNode)) { + LOG.debug("tasks arrived or departed"); + synchronized (taskReadyLock) { + taskReadySeq++; + taskReadyLock.notify(); + } + } + } + + /** + * If the worker is doing a task i.e. splitting a log file then stop the task. + * It doesn't exit the worker thread. + */ + void stopTask() { + LOG.info("Sending interrupt to stop the worker thread"); + worker.interrupt(); // TODO interrupt often gets swallowed, do what else? + } + + + /** + * start the SplitLogWorker thread + */ + public void start() { + worker = new Thread(null, this, "SplitLogWorker-" + serverName); + exitWorker = false; + worker.start(); + return; + } + + /** + * stop the SplitLogWorker thread + */ + public void stop() { + exitWorker = true; + stopTask(); + } + + /** + * Asynchronous handler for zk get-data-set-watch on node results. + */ + class GetDataAsyncCallback implements AsyncCallback.DataCallback { + private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, + Stat stat) { + tot_wkr_get_data_result.incrementAndGet(); + if (rc != 0) { + LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); + getDataSetWatchFailure(path); + return; + } + getDataSetWatchSuccess(path, data); + return; + } + } + + /** + * Objects implementing this interface actually do the task that has been + * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight + * guarantee that two workers will not be executing the same task therefore it + * is better to have workers prepare the task and then have the + * {@link SplitLogManager} commit the work in + * {@link SplitLogManager.TaskFinisher} + */ + static public interface TaskExecutor { + static public enum Status { + DONE(), + ERR(), + RESIGNED(), + PREEMPTED(); + } + public Status exec(String name, CancelableProgressable p); + } +}