From commits-return-72941-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Fri May 18 16:47:52 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6FA591807B5 for ; Fri, 18 May 2018 16:47:49 +0200 (CEST) Received: (qmail 61669 invoked by uid 500); 18 May 2018 14:47:46 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 60583 invoked by uid 99); 18 May 2018 14:47:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 May 2018 14:47:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 91F3BDFFAC; Fri, 18 May 2018 14:47:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Fri, 18 May 2018 14:48:05 -0000 Message-Id: <64a31b7250584c47a8fd88e6a9c88b96@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [22/29] hbase-site git commit: Published site at c9f8c3436f6e38b5c7807677c5c3e7fc3e19e071. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/ead846d7/devapidocs/org/apache/hadoop/hbase/util/package-tree.html ---------------------------------------------------------------------- diff --git a/devapidocs/org/apache/hadoop/hbase/util/package-tree.html b/devapidocs/org/apache/hadoop/hbase/util/package-tree.html index 6c464c1..845224b 100644 --- a/devapidocs/org/apache/hadoop/hbase/util/package-tree.html +++ b/devapidocs/org/apache/hadoop/hbase/util/package-tree.html @@ -532,14 +532,14 @@ http://git-wip-us.apache.org/repos/asf/hbase-site/blob/ead846d7/devapidocs/org/apache/hadoop/hbase/wal/package-tree.html ---------------------------------------------------------------------- diff --git a/devapidocs/org/apache/hadoop/hbase/wal/package-tree.html b/devapidocs/org/apache/hadoop/hbase/wal/package-tree.html index 9ec5b35..c0e28b4 100644 --- a/devapidocs/org/apache/hadoop/hbase/wal/package-tree.html +++ b/devapidocs/org/apache/hadoop/hbase/wal/package-tree.html @@ -189,8 +189,8 @@ http://git-wip-us.apache.org/repos/asf/hbase-site/blob/ead846d7/devapidocs/src-html/org/apache/hadoop/hbase/Version.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/Version.html b/devapidocs/src-html/org/apache/hadoop/hbase/Version.html index f05b00d..9bc07e3 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/Version.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/Version.html @@ -16,11 +16,11 @@ 008@InterfaceAudience.Private 009public class Version { 010 public static final String version = "3.0.0-SNAPSHOT"; -011 public static final String revision = "cf529f18a9959589fa635f78df4840472526ea2c"; +011 public static final String revision = "c9f8c3436f6e38b5c7807677c5c3e7fc3e19e071"; 012 public static final String user = "jenkins"; -013 public static final String date = "Thu May 17 14:39:20 UTC 2018"; +013 public static final String date = "Fri May 18 14:39:13 UTC 2018"; 014 public static final String url = "git://jenkins-websites1.apache.org/home/jenkins/jenkins-slave/workspace/hbase_generate_website/hbase"; -015 public static final String srcChecksum = "5e1af00298043edb8a0644db5a2c28dc"; +015 public static final String srcChecksum = "b02d3f1d4a3395e22f561b8280caf974"; 016} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/ead846d7/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/SplitLogWorker.TaskExecutor.Status.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/SplitLogWorker.TaskExecutor.Status.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/SplitLogWorker.TaskExecutor.Status.html index 02dbc37..2547651 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/SplitLogWorker.TaskExecutor.Status.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/SplitLogWorker.TaskExecutor.Status.html @@ -26,192 +26,198 @@ 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 -021import java.io.IOException; -022import java.io.InterruptedIOException; -023import java.net.ConnectException; -024import java.net.SocketTimeoutException; -025 -026import org.apache.yetus.audience.InterfaceAudience; -027import org.slf4j.Logger; -028import org.slf4j.LoggerFactory; -029import org.apache.hadoop.conf.Configuration; -030import org.apache.hadoop.fs.FileSystem; -031import org.apache.hadoop.fs.Path; -032import org.apache.hadoop.hbase.NotServingRegionException; -033import org.apache.hadoop.hbase.Server; -034import org.apache.hadoop.hbase.client.RetriesExhaustedException; -035import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; -036import org.apache.hadoop.hbase.wal.WALFactory; -037import org.apache.hadoop.hbase.wal.WALSplitter; -038import org.apache.hadoop.hbase.util.CancelableProgressable; -039import org.apache.hadoop.hbase.util.ExceptionUtil; -040import org.apache.hadoop.hbase.util.FSUtils; -041 -042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -043 -044/** -045 * This worker is spawned in every regionserver, including master. The Worker waits for log -046 * splitting tasks to be put up by the {@link org.apache.hadoop.hbase.master.SplitLogManager} -047 * running in the master and races with other workers in other serves to acquire those tasks. -048 * The coordination is done via coordination engine. -049 * <p> -050 * If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task. -051 * It keeps heart beating the manager by periodically moving the task from UNASSIGNED to OWNED -052 * state. On success it moves the task to TASK_DONE. On unrecoverable error it moves task state to -053 * ERR. If it cannot continue but wants the master to retry the task then it moves the task state to -054 * RESIGNED. -055 * <p> -056 * The manager can take a task away from a worker by moving the task from OWNED to UNASSIGNED. In -057 * the absence of a global lock there is a unavoidable race here - a worker might have just finished -058 * its task when it is stripped of its ownership. Here we rely on the idempotency of the log -059 * splitting task for correctness -060 */ -061@InterfaceAudience.Private -062public class SplitLogWorker implements Runnable { -063 -064 private static final Logger LOG = LoggerFactory.getLogger(SplitLogWorker.class); -065 -066 Thread worker; -067 // thread pool which executes recovery work -068 private SplitLogWorkerCoordination coordination; -069 private Configuration conf; -070 private RegionServerServices server; -071 -072 public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, -073 TaskExecutor splitTaskExecutor) { -074 this.server = server; -075 this.conf = conf; -076 this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination(); -077 coordination.init(server, conf, splitTaskExecutor, this); -078 } -079 -080 public SplitLogWorker(final Server hserver, final Configuration conf, -081 final RegionServerServices server, final LastSequenceId sequenceIdChecker, -082 final WALFactory factory) { -083 this(hserver, conf, server, new TaskExecutor() { -084 @Override -085 public Status exec(String filename, CancelableProgressable p) { -086 Path walDir; -087 FileSystem fs; -088 try { -089 walDir = FSUtils.getWALRootDir(conf); -090 fs = walDir.getFileSystem(conf); -091 } catch (IOException e) { -092 LOG.warn("could not find root dir or fs", e); -093 return Status.RESIGNED; -094 } -095 // TODO have to correctly figure out when log splitting has been -096 // interrupted or has encountered a transient error and when it has -097 // encountered a bad non-retry-able persistent error. -098 try { -099 if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), -100 fs, conf, p, sequenceIdChecker, -101 server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) { -102 return Status.PREEMPTED; -103 } -104 } catch (InterruptedIOException iioe) { -105 LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe); -106 return Status.RESIGNED; -107 } catch (IOException e) { -108 Throwable cause = e.getCause(); -109 if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException -110 || cause instanceof ConnectException -111 || cause instanceof SocketTimeoutException)) { -112 LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " -113 + "resigning", e); -114 return Status.RESIGNED; -115 } else if (cause instanceof InterruptedException) { -116 LOG.warn("log splitting of " + filename + " interrupted, resigning", e); -117 return Status.RESIGNED; -118 } -119 LOG.warn("log splitting of " + filename + " failed, returning error", e); -120 return Status.ERR; -121 } -122 return Status.DONE; -123 } -124 }); -125 } -126 -127 @Override -128 public void run() { -129 try { -130 LOG.info("SplitLogWorker " + server.getServerName() + " starting"); -131 coordination.registerListener(); -132 // wait for Coordination Engine is ready -133 boolean res = false; -134 while (!res && !coordination.isStop()) { -135 res = coordination.isReady(); -136 } -137 if (!coordination.isStop()) { -138 coordination.taskLoop(); -139 } -140 } catch (Throwable t) { -141 if (ExceptionUtil.isInterrupt(t)) { -142 LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" : -143 " (ERROR: exitWorker is not set, exiting anyway)")); -144 } else { -145 // only a logical error can cause here. Printing it out -146 // to make debugging easier -147 LOG.error("unexpected error ", t); -148 } -149 } finally { -150 coordination.removeListener(); -151 LOG.info("SplitLogWorker " + server.getServerName() + " exiting"); -152 } -153 } -154 -155 /** -156 * If the worker is doing a task i.e. splitting a log file then stop the task. -157 * It doesn't exit the worker thread. -158 */ -159 public void stopTask() { -160 LOG.info("Sending interrupt to stop the worker thread"); -161 worker.interrupt(); // TODO interrupt often gets swallowed, do what else? -162 } -163 -164 /** -165 * start the SplitLogWorker thread -166 */ -167 public void start() { -168 worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName().toShortString()); -169 worker.start(); -170 } -171 -172 /** -173 * stop the SplitLogWorker thread -174 */ -175 public void stop() { -176 coordination.stopProcessingTasks(); -177 stopTask(); -178 } -179 -180 /** -181 * Objects implementing this interface actually do the task that has been -182 * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight -183 * guarantee that two workers will not be executing the same task therefore it -184 * is better to have workers prepare the task and then have the -185 * {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in -186 * SplitLogManager.TaskFinisher -187 */ -188 public interface TaskExecutor { -189 enum Status { -190 DONE(), -191 ERR(), -192 RESIGNED(), -193 PREEMPTED() -194 } -195 Status exec(String name, CancelableProgressable p); -196 } -197 -198 /** -199 * Returns the number of tasks processed by coordination. -200 * This method is used by tests only -201 */ -202 @VisibleForTesting -203 public int getTaskReadySeq() { -204 return coordination.getTaskReadySeq(); -205 } -206} +021import java.io.FileNotFoundException; +022import java.io.IOException; +023import java.io.InterruptedIOException; +024import java.net.ConnectException; +025import java.net.SocketTimeoutException; +026 +027import org.apache.yetus.audience.InterfaceAudience; +028import org.slf4j.Logger; +029import org.slf4j.LoggerFactory; +030import org.apache.hadoop.conf.Configuration; +031import org.apache.hadoop.fs.FileSystem; +032import org.apache.hadoop.fs.Path; +033import org.apache.hadoop.hbase.NotServingRegionException; +034import org.apache.hadoop.hbase.Server; +035import org.apache.hadoop.hbase.client.RetriesExhaustedException; +036import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; +037import org.apache.hadoop.hbase.wal.WALFactory; +038import org.apache.hadoop.hbase.wal.WALSplitter; +039import org.apache.hadoop.hbase.util.CancelableProgressable; +040import org.apache.hadoop.hbase.util.ExceptionUtil; +041import org.apache.hadoop.hbase.util.FSUtils; +042 +043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +044 +045/** +046 * This worker is spawned in every regionserver, including master. The Worker waits for log +047 * splitting tasks to be put up by the {@link org.apache.hadoop.hbase.master.SplitLogManager} +048 * running in the master and races with other workers in other serves to acquire those tasks. +049 * The coordination is done via coordination engine. +050 * <p> +051 * If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task. +052 * It keeps heart beating the manager by periodically moving the task from UNASSIGNED to OWNED +053 * state. On success it moves the task to TASK_DONE. On unrecoverable error it moves task state to +054 * ERR. If it cannot continue but wants the master to retry the task then it moves the task state to +055 * RESIGNED. +056 * <p> +057 * The manager can take a task away from a worker by moving the task from OWNED to UNASSIGNED. In +058 * the absence of a global lock there is a unavoidable race here - a worker might have just finished +059 * its task when it is stripped of its ownership. Here we rely on the idempotency of the log +060 * splitting task for correctness +061 */ +062@InterfaceAudience.Private +063public class SplitLogWorker implements Runnable { +064 +065 private static final Logger LOG = LoggerFactory.getLogger(SplitLogWorker.class); +066 +067 Thread worker; +068 // thread pool which executes recovery work +069 private SplitLogWorkerCoordination coordination; +070 private Configuration conf; +071 private RegionServerServices server; +072 +073 public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, +074 TaskExecutor splitTaskExecutor) { +075 this.server = server; +076 this.conf = conf; +077 this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination(); +078 coordination.init(server, conf, splitTaskExecutor, this); +079 } +080 +081 public SplitLogWorker(final Server hserver, final Configuration conf, +082 final RegionServerServices server, final LastSequenceId sequenceIdChecker, +083 final WALFactory factory) { +084 this(hserver, conf, server, new TaskExecutor() { +085 @Override +086 public Status exec(String filename, CancelableProgressable p) { +087 Path walDir; +088 FileSystem fs; +089 try { +090 walDir = FSUtils.getWALRootDir(conf); +091 fs = walDir.getFileSystem(conf); +092 } catch (IOException e) { +093 LOG.warn("could not find root dir or fs", e); +094 return Status.RESIGNED; +095 } +096 // TODO have to correctly figure out when log splitting has been +097 // interrupted or has encountered a transient error and when it has +098 // encountered a bad non-retry-able persistent error. +099 try { +100 if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), +101 fs, conf, p, sequenceIdChecker, +102 server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) { +103 return Status.PREEMPTED; +104 } +105 } catch (InterruptedIOException iioe) { +106 LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe); +107 return Status.RESIGNED; +108 } catch (IOException e) { +109 if (e instanceof FileNotFoundException) { +110 // A wal file may not exist anymore. Nothing can be recovered so move on +111 LOG.warn("WAL {} does not exist anymore", filename, e); +112 return Status.DONE; +113 } +114 Throwable cause = e.getCause(); +115 if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException +116 || cause instanceof ConnectException +117 || cause instanceof SocketTimeoutException)) { +118 LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " +119 + "resigning", e); +120 return Status.RESIGNED; +121 } else if (cause instanceof InterruptedException) { +122 LOG.warn("log splitting of " + filename + " interrupted, resigning", e); +123 return Status.RESIGNED; +124 } +125 LOG.warn("log splitting of " + filename + " failed, returning error", e); +126 return Status.ERR; +127 } +128 return Status.DONE; +129 } +130 }); +131 } +132 +133 @Override +134 public void run() { +135 try { +136 LOG.info("SplitLogWorker " + server.getServerName() + " starting"); +137 coordination.registerListener(); +138 // wait for Coordination Engine is ready +139 boolean res = false; +140 while (!res && !coordination.isStop()) { +141 res = coordination.isReady(); +142 } +143 if (!coordination.isStop()) { +144 coordination.taskLoop(); +145 } +146 } catch (Throwable t) { +147 if (ExceptionUtil.isInterrupt(t)) { +148 LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" : +149 " (ERROR: exitWorker is not set, exiting anyway)")); +150 } else { +151 // only a logical error can cause here. Printing it out +152 // to make debugging easier +153 LOG.error("unexpected error ", t); +154 } +155 } finally { +156 coordination.removeListener(); +157 LOG.info("SplitLogWorker " + server.getServerName() + " exiting"); +158 } +159 } +160 +161 /** +162 * If the worker is doing a task i.e. splitting a log file then stop the task. +163 * It doesn't exit the worker thread. +164 */ +165 public void stopTask() { +166 LOG.info("Sending interrupt to stop the worker thread"); +167 worker.interrupt(); // TODO interrupt often gets swallowed, do what else? +168 } +169 +170 /** +171 * start the SplitLogWorker thread +172 */ +173 public void start() { +174 worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName().toShortString()); +175 worker.start(); +176 } +177 +178 /** +179 * stop the SplitLogWorker thread +180 */ +181 public void stop() { +182 coordination.stopProcessingTasks(); +183 stopTask(); +184 } +185 +186 /** +187 * Objects implementing this interface actually do the task that has been +188 * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight +189 * guarantee that two workers will not be executing the same task therefore it +190 * is better to have workers prepare the task and then have the +191 * {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in +192 * SplitLogManager.TaskFinisher +193 */ +194 public interface TaskExecutor { +195 enum Status { +196 DONE(), +197 ERR(), +198 RESIGNED(), +199 PREEMPTED() +200 } +201 Status exec(String name, CancelableProgressable p); +202 } +203 +204 /** +205 * Returns the number of tasks processed by coordination. +206 * This method is used by tests only +207 */ +208 @VisibleForTesting +209 public int getTaskReadySeq() { +210 return coordination.getTaskReadySeq(); +211 } +212} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/ead846d7/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/SplitLogWorker.TaskExecutor.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/SplitLogWorker.TaskExecutor.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/SplitLogWorker.TaskExecutor.html index 02dbc37..2547651 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/SplitLogWorker.TaskExecutor.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/SplitLogWorker.TaskExecutor.html @@ -26,192 +26,198 @@ 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 -021import java.io.IOException; -022import java.io.InterruptedIOException; -023import java.net.ConnectException; -024import java.net.SocketTimeoutException; -025 -026import org.apache.yetus.audience.InterfaceAudience; -027import org.slf4j.Logger; -028import org.slf4j.LoggerFactory; -029import org.apache.hadoop.conf.Configuration; -030import org.apache.hadoop.fs.FileSystem; -031import org.apache.hadoop.fs.Path; -032import org.apache.hadoop.hbase.NotServingRegionException; -033import org.apache.hadoop.hbase.Server; -034import org.apache.hadoop.hbase.client.RetriesExhaustedException; -035import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; -036import org.apache.hadoop.hbase.wal.WALFactory; -037import org.apache.hadoop.hbase.wal.WALSplitter; -038import org.apache.hadoop.hbase.util.CancelableProgressable; -039import org.apache.hadoop.hbase.util.ExceptionUtil; -040import org.apache.hadoop.hbase.util.FSUtils; -041 -042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -043 -044/** -045 * This worker is spawned in every regionserver, including master. The Worker waits for log -046 * splitting tasks to be put up by the {@link org.apache.hadoop.hbase.master.SplitLogManager} -047 * running in the master and races with other workers in other serves to acquire those tasks. -048 * The coordination is done via coordination engine. -049 * <p> -050 * If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task. -051 * It keeps heart beating the manager by periodically moving the task from UNASSIGNED to OWNED -052 * state. On success it moves the task to TASK_DONE. On unrecoverable error it moves task state to -053 * ERR. If it cannot continue but wants the master to retry the task then it moves the task state to -054 * RESIGNED. -055 * <p> -056 * The manager can take a task away from a worker by moving the task from OWNED to UNASSIGNED. In -057 * the absence of a global lock there is a unavoidable race here - a worker might have just finished -058 * its task when it is stripped of its ownership. Here we rely on the idempotency of the log -059 * splitting task for correctness -060 */ -061@InterfaceAudience.Private -062public class SplitLogWorker implements Runnable { -063 -064 private static final Logger LOG = LoggerFactory.getLogger(SplitLogWorker.class); -065 -066 Thread worker; -067 // thread pool which executes recovery work -068 private SplitLogWorkerCoordination coordination; -069 private Configuration conf; -070 private RegionServerServices server; -071 -072 public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, -073 TaskExecutor splitTaskExecutor) { -074 this.server = server; -075 this.conf = conf; -076 this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination(); -077 coordination.init(server, conf, splitTaskExecutor, this); -078 } -079 -080 public SplitLogWorker(final Server hserver, final Configuration conf, -081 final RegionServerServices server, final LastSequenceId sequenceIdChecker, -082 final WALFactory factory) { -083 this(hserver, conf, server, new TaskExecutor() { -084 @Override -085 public Status exec(String filename, CancelableProgressable p) { -086 Path walDir; -087 FileSystem fs; -088 try { -089 walDir = FSUtils.getWALRootDir(conf); -090 fs = walDir.getFileSystem(conf); -091 } catch (IOException e) { -092 LOG.warn("could not find root dir or fs", e); -093 return Status.RESIGNED; -094 } -095 // TODO have to correctly figure out when log splitting has been -096 // interrupted or has encountered a transient error and when it has -097 // encountered a bad non-retry-able persistent error. -098 try { -099 if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), -100 fs, conf, p, sequenceIdChecker, -101 server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) { -102 return Status.PREEMPTED; -103 } -104 } catch (InterruptedIOException iioe) { -105 LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe); -106 return Status.RESIGNED; -107 } catch (IOException e) { -108 Throwable cause = e.getCause(); -109 if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException -110 || cause instanceof ConnectException -111 || cause instanceof SocketTimeoutException)) { -112 LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " -113 + "resigning", e); -114 return Status.RESIGNED; -115 } else if (cause instanceof InterruptedException) { -116 LOG.warn("log splitting of " + filename + " interrupted, resigning", e); -117 return Status.RESIGNED; -118 } -119 LOG.warn("log splitting of " + filename + " failed, returning error", e); -120 return Status.ERR; -121 } -122 return Status.DONE; -123 } -124 }); -125 } -126 -127 @Override -128 public void run() { -129 try { -130 LOG.info("SplitLogWorker " + server.getServerName() + " starting"); -131 coordination.registerListener(); -132 // wait for Coordination Engine is ready -133 boolean res = false; -134 while (!res && !coordination.isStop()) { -135 res = coordination.isReady(); -136 } -137 if (!coordination.isStop()) { -138 coordination.taskLoop(); -139 } -140 } catch (Throwable t) { -141 if (ExceptionUtil.isInterrupt(t)) { -142 LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" : -143 " (ERROR: exitWorker is not set, exiting anyway)")); -144 } else { -145 // only a logical error can cause here. Printing it out -146 // to make debugging easier -147 LOG.error("unexpected error ", t); -148 } -149 } finally { -150 coordination.removeListener(); -151 LOG.info("SplitLogWorker " + server.getServerName() + " exiting"); -152 } -153 } -154 -155 /** -156 * If the worker is doing a task i.e. splitting a log file then stop the task. -157 * It doesn't exit the worker thread. -158 */ -159 public void stopTask() { -160 LOG.info("Sending interrupt to stop the worker thread"); -161 worker.interrupt(); // TODO interrupt often gets swallowed, do what else? -162 } -163 -164 /** -165 * start the SplitLogWorker thread -166 */ -167 public void start() { -168 worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName().toShortString()); -169 worker.start(); -170 } -171 -172 /** -173 * stop the SplitLogWorker thread -174 */ -175 public void stop() { -176 coordination.stopProcessingTasks(); -177 stopTask(); -178 } -179 -180 /** -181 * Objects implementing this interface actually do the task that has been -182 * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight -183 * guarantee that two workers will not be executing the same task therefore it -184 * is better to have workers prepare the task and then have the -185 * {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in -186 * SplitLogManager.TaskFinisher -187 */ -188 public interface TaskExecutor { -189 enum Status { -190 DONE(), -191 ERR(), -192 RESIGNED(), -193 PREEMPTED() -194 } -195 Status exec(String name, CancelableProgressable p); -196 } -197 -198 /** -199 * Returns the number of tasks processed by coordination. -200 * This method is used by tests only -201 */ -202 @VisibleForTesting -203 public int getTaskReadySeq() { -204 return coordination.getTaskReadySeq(); -205 } -206} +021import java.io.FileNotFoundException; +022import java.io.IOException; +023import java.io.InterruptedIOException; +024import java.net.ConnectException; +025import java.net.SocketTimeoutException; +026 +027import org.apache.yetus.audience.InterfaceAudience; +028import org.slf4j.Logger; +029import org.slf4j.LoggerFactory; +030import org.apache.hadoop.conf.Configuration; +031import org.apache.hadoop.fs.FileSystem; +032import org.apache.hadoop.fs.Path; +033import org.apache.hadoop.hbase.NotServingRegionException; +034import org.apache.hadoop.hbase.Server; +035import org.apache.hadoop.hbase.client.RetriesExhaustedException; +036import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; +037import org.apache.hadoop.hbase.wal.WALFactory; +038import org.apache.hadoop.hbase.wal.WALSplitter; +039import org.apache.hadoop.hbase.util.CancelableProgressable; +040import org.apache.hadoop.hbase.util.ExceptionUtil; +041import org.apache.hadoop.hbase.util.FSUtils; +042 +043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +044 +045/** +046 * This worker is spawned in every regionserver, including master. The Worker waits for log +047 * splitting tasks to be put up by the {@link org.apache.hadoop.hbase.master.SplitLogManager} +048 * running in the master and races with other workers in other serves to acquire those tasks. +049 * The coordination is done via coordination engine. +050 * <p> +051 * If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task. +052 * It keeps heart beating the manager by periodically moving the task from UNASSIGNED to OWNED +053 * state. On success it moves the task to TASK_DONE. On unrecoverable error it moves task state to +054 * ERR. If it cannot continue but wants the master to retry the task then it moves the task state to +055 * RESIGNED. +056 * <p> +057 * The manager can take a task away from a worker by moving the task from OWNED to UNASSIGNED. In +058 * the absence of a global lock there is a unavoidable race here - a worker might have just finished +059 * its task when it is stripped of its ownership. Here we rely on the idempotency of the log +060 * splitting task for correctness +061 */ +062@InterfaceAudience.Private +063public class SplitLogWorker implements Runnable { +064 +065 private static final Logger LOG = LoggerFactory.getLogger(SplitLogWorker.class); +066 +067 Thread worker; +068 // thread pool which executes recovery work +069 private SplitLogWorkerCoordination coordination; +070 private Configuration conf; +071 private RegionServerServices server; +072 +073 public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, +074 TaskExecutor splitTaskExecutor) { +075 this.server = server; +076 this.conf = conf; +077 this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination(); +078 coordination.init(server, conf, splitTaskExecutor, this); +079 } +080 +081 public SplitLogWorker(final Server hserver, final Configuration conf, +082 final RegionServerServices server, final LastSequenceId sequenceIdChecker, +083 final WALFactory factory) { +084 this(hserver, conf, server, new TaskExecutor() { +085 @Override +086 public Status exec(String filename, CancelableProgressable p) { +087 Path walDir; +088 FileSystem fs; +089 try { +090 walDir = FSUtils.getWALRootDir(conf); +091 fs = walDir.getFileSystem(conf); +092 } catch (IOException e) { +093 LOG.warn("could not find root dir or fs", e); +094 return Status.RESIGNED; +095 } +096 // TODO have to correctly figure out when log splitting has been +097 // interrupted or has encountered a transient error and when it has +098 // encountered a bad non-retry-able persistent error. +099 try { +100 if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), +101 fs, conf, p, sequenceIdChecker, +102 server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) { +103 return Status.PREEMPTED; +104 } +105 } catch (InterruptedIOException iioe) { +106 LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe); +107 return Status.RESIGNED; +108 } catch (IOException e) { +109 if (e instanceof FileNotFoundException) { +110 // A wal file may not exist anymore. Nothing can be recovered so move on +111 LOG.warn("WAL {} does not exist anymore", filename, e); +112 return Status.DONE; +113 } +114 Throwable cause = e.getCause(); +115 if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException +116 || cause instanceof ConnectException +117 || cause instanceof SocketTimeoutException)) { +118 LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " +119 + "resigning", e); +120 return Status.RESIGNED; +121 } else if (cause instanceof InterruptedException) { +122 LOG.warn("log splitting of " + filename + " interrupted, resigning", e); +123 return Status.RESIGNED; +124 } +125 LOG.warn("log splitting of " + filename + " failed, returning error", e); +126 return Status.ERR; +127 } +128 return Status.DONE; +129 } +130 }); +131 } +132 +133 @Override +134 public void run() { +135 try { +136 LOG.info("SplitLogWorker " + server.getServerName() + " starting"); +137 coordination.registerListener(); +138 // wait for Coordination Engine is ready +139 boolean res = false; +140 while (!res && !coordination.isStop()) { +141 res = coordination.isReady(); +142 } +143 if (!coordination.isStop()) { +144 coordination.taskLoop(); +145 } +146 } catch (Throwable t) { +147 if (ExceptionUtil.isInterrupt(t)) { +148 LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" : +149 " (ERROR: exitWorker is not set, exiting anyway)")); +150 } else { +151 // only a logical error can cause here. Printing it out +152 // to make debugging easier +153 LOG.error("unexpected error ", t); +154 } +155 } finally { +156 coordination.removeListener(); +157 LOG.info("SplitLogWorker " + server.getServerName() + " exiting"); +158 } +159 } +160 +161 /** +162 * If the worker is doing a task i.e. splitting a log file then stop the task. +163 * It doesn't exit the worker thread. +164 */ +165 public void stopTask() { +166 LOG.info("Sending interrupt to stop the worker thread"); +167 worker.interrupt(); // TODO interrupt often gets swallowed, do what else? +168 } +169 +170 /** +171 * start the SplitLogWorker thread +172 */ +173 public void start() { +174 worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName().toShortString()); +175 worker.start(); +176 } +177 +178 /** +179 * stop the SplitLogWorker thread +180 */ +181 public void stop() { +182 coordination.stopProcessingTasks(); +183 stopTask(); +184 } +185 +186 /** +187 * Objects implementing this interface actually do the task that has been +188 * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight +189 * guarantee that two workers will not be executing the same task therefore it +190 * is better to have workers prepare the task and then have the +191 * {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in +192 * SplitLogManager.TaskFinisher +193 */ +194 public interface TaskExecutor { +195 enum Status { +196 DONE(), +197 ERR(), +198 RESIGNED(), +199 PREEMPTED() +200 } +201 Status exec(String name, CancelableProgressable p); +202 } +203 +204 /** +205 * Returns the number of tasks processed by coordination. +206 * This method is used by tests only +207 */ +208 @VisibleForTesting +209 public int getTaskReadySeq() { +210 return coordination.getTaskReadySeq(); +211 } +212}