Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 02918200C8F for ; Thu, 25 May 2017 16:59:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F3934160BDC; Thu, 25 May 2017 14:59:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 758FA160BD8 for ; Thu, 25 May 2017 16:59:35 +0200 (CEST) Received: (qmail 96581 invoked by uid 500); 25 May 2017 14:59:34 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 96376 invoked by uid 99); 25 May 2017 14:59:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 May 2017 14:59:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D86B9E117B; Thu, 25 May 2017 14:59:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 25 May 2017 14:59:38 -0000 Message-Id: <44cd6892fe924aa9b4d269a61b842674@git.apache.org> In-Reply-To: <8d5120da9fc94a3bba89b4778cc295d5@git.apache.org> References: <8d5120da9fc94a3bba89b4778cc295d5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/37] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Thu, 25 May 2017 14:59:39 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6cafca90/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.BulkLoadListener.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.BulkLoadListener.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.BulkLoadListener.html index 51c3f2c..b060ede 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.BulkLoadListener.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.BulkLoadListener.html @@ -753,7 +753,10 @@ 745 /** Wait for all current flushes and compactions of the region to complete */ 746 void waitForFlushesAndCompactions(); 747 -748} +748 /** Wait for all current flushes of the region to complete +749 */ +750 void waitForFlushes(); +751} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6cafca90/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.FlushResult.Result.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.FlushResult.Result.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.FlushResult.Result.html index 51c3f2c..b060ede 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.FlushResult.Result.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.FlushResult.Result.html @@ -753,7 +753,10 @@ 745 /** Wait for all current flushes and compactions of the region to complete */ 746 void waitForFlushesAndCompactions(); 747 -748} +748 /** Wait for all current flushes of the region to complete +749 */ +750 void waitForFlushes(); +751} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6cafca90/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.FlushResult.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.FlushResult.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.FlushResult.html index 51c3f2c..b060ede 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.FlushResult.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.FlushResult.html @@ -753,7 +753,10 @@ 745 /** Wait for all current flushes and compactions of the region to complete */ 746 void waitForFlushesAndCompactions(); 747 -748} +748 /** Wait for all current flushes of the region to complete +749 */ +750 void waitForFlushes(); +751} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6cafca90/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.Operation.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.Operation.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.Operation.html index 51c3f2c..b060ede 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.Operation.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.Operation.html @@ -753,7 +753,10 @@ 745 /** Wait for all current flushes and compactions of the region to complete */ 746 void waitForFlushesAndCompactions(); 747 -748} +748 /** Wait for all current flushes of the region to complete +749 */ +750 void waitForFlushes(); +751} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6cafca90/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.RowLock.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.RowLock.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.RowLock.html index 51c3f2c..b060ede 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.RowLock.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.RowLock.html @@ -753,7 +753,10 @@ 745 /** Wait for all current flushes and compactions of the region to complete */ 746 void waitForFlushesAndCompactions(); 747 -748} +748 /** Wait for all current flushes of the region to complete +749 */ +750 void waitForFlushes(); +751} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6cafca90/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.html index 51c3f2c..b060ede 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/Region.html @@ -753,7 +753,10 @@ 745 /** Wait for all current flushes and compactions of the region to complete */ 746 void waitForFlushesAndCompactions(); 747 -748} +748 /** Wait for all current flushes of the region to complete +749 */ +750 void waitForFlushes(); +751} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6cafca90/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.RegionSnapshotTask.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.RegionSnapshotTask.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.RegionSnapshotTask.html index 7575174..238f4d6 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.RegionSnapshotTask.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.RegionSnapshotTask.html @@ -38,158 +38,164 @@ 030import org.apache.hadoop.hbase.procedure.Subprocedure; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.Region; -033import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool; -034import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; -035import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; -036 -037/** -038 * This online snapshot implementation uses the distributed procedure framework to force a -039 * store flush and then records the hfiles. Its enter stage does nothing. Its leave stage then -040 * flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and -041 * copies .regioninfos into the snapshot working directory. At the master side, there is an atomic -042 * rename of the working dir into the proper snapshot directory. -043 */ -044@InterfaceAudience.Private -045@InterfaceStability.Unstable -046public class FlushSnapshotSubprocedure extends Subprocedure { -047 private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class); -048 -049 private final List<Region> regions; -050 private final SnapshotDescription snapshot; -051 private final SnapshotSubprocedurePool taskManager; -052 private boolean snapshotSkipFlush = false; -053 -054 public FlushSnapshotSubprocedure(ProcedureMember member, -055 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, -056 List<Region> regions, SnapshotDescription snapshot, -057 SnapshotSubprocedurePool taskManager) { -058 super(member, snapshot.getName(), errorListener, wakeFrequency, timeout); -059 this.snapshot = snapshot; -060 -061 if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) { -062 snapshotSkipFlush = true; -063 } -064 this.regions = regions; -065 this.taskManager = taskManager; -066 } -067 -068 /** -069 * Callable for adding files to snapshot manifest working dir. Ready for multithreading. -070 */ -071 private class RegionSnapshotTask implements Callable<Void> { -072 Region region; -073 RegionSnapshotTask(Region region) { -074 this.region = region; -075 } -076 -077 @Override -078 public Void call() throws Exception { -079 // Taking the region read lock prevents the individual region from being closed while a -080 // snapshot is in progress. This is helpful but not sufficient for preventing races with -081 // snapshots that involve multiple regions and regionservers. It is still possible to have -082 // an interleaving such that globally regions are missing, so we still need the verification -083 // step. -084 LOG.debug("Starting region operation on " + region); -085 region.startRegionOperation(); -086 try { -087 if (snapshotSkipFlush) { -088 /* -089 * This is to take an online-snapshot without force a coordinated flush to prevent pause -090 * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure -091 * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be -092 * turned on/off based on the flush type. -093 * To minimized the code change, class name is not changed. -094 */ -095 LOG.debug("take snapshot without flush memstore first"); -096 } else { -097 LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); -098 region.flush(true); -099 } -100 ((HRegion)region).addRegionToSnapshot(snapshot, monitor); -101 if (snapshotSkipFlush) { -102 LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed."); -103 } else { -104 LOG.debug("... Flush Snapshotting region " + region.toString() + " completed."); +033import org.apache.hadoop.hbase.regionserver.Region.FlushResult; +034import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool; +035import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; +036import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; +037 +038/** +039 * This online snapshot implementation uses the distributed procedure framework to force a +040 * store flush and then records the hfiles. Its enter stage does nothing. Its leave stage then +041 * flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and +042 * copies .regioninfos into the snapshot working directory. At the master side, there is an atomic +043 * rename of the working dir into the proper snapshot directory. +044 */ +045@InterfaceAudience.Private +046@InterfaceStability.Unstable +047public class FlushSnapshotSubprocedure extends Subprocedure { +048 private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class); +049 +050 private final List<Region> regions; +051 private final SnapshotDescription snapshot; +052 private final SnapshotSubprocedurePool taskManager; +053 private boolean snapshotSkipFlush = false; +054 +055 public FlushSnapshotSubprocedure(ProcedureMember member, +056 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, +057 List<Region> regions, SnapshotDescription snapshot, +058 SnapshotSubprocedurePool taskManager) { +059 super(member, snapshot.getName(), errorListener, wakeFrequency, timeout); +060 this.snapshot = snapshot; +061 +062 if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) { +063 snapshotSkipFlush = true; +064 } +065 this.regions = regions; +066 this.taskManager = taskManager; +067 } +068 +069 /** +070 * Callable for adding files to snapshot manifest working dir. Ready for multithreading. +071 */ +072 private class RegionSnapshotTask implements Callable<Void> { +073 Region region; +074 RegionSnapshotTask(Region region) { +075 this.region = region; +076 } +077 +078 @Override +079 public Void call() throws Exception { +080 // Taking the region read lock prevents the individual region from being closed while a +081 // snapshot is in progress. This is helpful but not sufficient for preventing races with +082 // snapshots that involve multiple regions and regionservers. It is still possible to have +083 // an interleaving such that globally regions are missing, so we still need the verification +084 // step. +085 LOG.debug("Starting region operation on " + region); +086 region.startRegionOperation(); +087 try { +088 if (snapshotSkipFlush) { +089 /* +090 * This is to take an online-snapshot without force a coordinated flush to prevent pause +091 * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure +092 * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be +093 * turned on/off based on the flush type. +094 * To minimized the code change, class name is not changed. +095 */ +096 LOG.debug("take snapshot without flush memstore first"); +097 } else { +098 LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); +099 FlushResult res = region.flush(true); +100 if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { +101 // CANNOT_FLUSH may mean that a flush is already on-going +102 // we need to wait for that flush to complete +103 region.waitForFlushes(); +104 } 105 } -106 } finally { -107 LOG.debug("Closing region operation on " + region); -108 region.closeRegionOperation(); -109 } -110 return null; -111 } -112 } -113 -114 private void flushSnapshot() throws ForeignException { -115 if (regions.isEmpty()) { -116 // No regions on this RS, we are basically done. -117 return; -118 } +106 ((HRegion)region).addRegionToSnapshot(snapshot, monitor); +107 if (snapshotSkipFlush) { +108 LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed."); +109 } else { +110 LOG.debug("... Flush Snapshotting region " + region.toString() + " completed."); +111 } +112 } finally { +113 LOG.debug("Closing region operation on " + region); +114 region.closeRegionOperation(); +115 } +116 return null; +117 } +118 } 119 -120 monitor.rethrowException(); -121 -122 // assert that the taskManager is empty. -123 if (taskManager.hasTasks()) { -124 throw new IllegalStateException("Attempting to take snapshot " -125 + ClientSnapshotDescriptionUtils.toString(snapshot) -126 + " but we currently have outstanding tasks"); -127 } -128 -129 // Add all hfiles already existing in region. -130 for (Region region : regions) { -131 // submit one task per region for parallelize by region. -132 taskManager.submitTask(new RegionSnapshotTask(region)); -133 monitor.rethrowException(); -134 } -135 -136 // wait for everything to complete. -137 LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions"); -138 try { -139 taskManager.waitForOutstandingTasks(); -140 } catch (InterruptedException e) { -141 LOG.error("got interrupted exception for " + getMemberName()); -142 throw new ForeignException(getMemberName(), e); -143 } -144 } -145 -146 /** -147 * do nothing, core of snapshot is executed in {@link #insideBarrier} step. -148 */ -149 @Override -150 public void acquireBarrier() throws ForeignException { -151 // NO OP -152 } -153 -154 /** -155 * do a flush snapshot of every region on this rs from the target table. -156 */ -157 @Override -158 public byte[] insideBarrier() throws ForeignException { -159 flushSnapshot(); -160 return new byte[0]; -161 } -162 -163 /** -164 * Cancel threads if they haven't finished. -165 */ -166 @Override -167 public void cleanup(Exception e) { -168 LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '" -169 + snapshot.getName() + "' due to error", e); -170 try { -171 taskManager.cancelTasks(); -172 } catch (InterruptedException e1) { -173 Thread.currentThread().interrupt(); -174 } -175 } -176 -177 /** -178 * Hooray! -179 */ -180 public void releaseBarrier() { -181 // NO OP -182 } -183 -184} +120 private void flushSnapshot() throws ForeignException { +121 if (regions.isEmpty()) { +122 // No regions on this RS, we are basically done. +123 return; +124 } +125 +126 monitor.rethrowException(); +127 +128 // assert that the taskManager is empty. +129 if (taskManager.hasTasks()) { +130 throw new IllegalStateException("Attempting to take snapshot " +131 + ClientSnapshotDescriptionUtils.toString(snapshot) +132 + " but we currently have outstanding tasks"); +133 } +134 +135 // Add all hfiles already existing in region. +136 for (Region region : regions) { +137 // submit one task per region for parallelize by region. +138 taskManager.submitTask(new RegionSnapshotTask(region)); +139 monitor.rethrowException(); +140 } +141 +142 // wait for everything to complete. +143 LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions"); +144 try { +145 taskManager.waitForOutstandingTasks(); +146 } catch (InterruptedException e) { +147 LOG.error("got interrupted exception for " + getMemberName()); +148 throw new ForeignException(getMemberName(), e); +149 } +150 } +151 +152 /** +153 * do nothing, core of snapshot is executed in {@link #insideBarrier} step. +154 */ +155 @Override +156 public void acquireBarrier() throws ForeignException { +157 // NO OP +158 } +159 +160 /** +161 * do a flush snapshot of every region on this rs from the target table. +162 */ +163 @Override +164 public byte[] insideBarrier() throws ForeignException { +165 flushSnapshot(); +166 return new byte[0]; +167 } +168 +169 /** +170 * Cancel threads if they haven't finished. +171 */ +172 @Override +173 public void cleanup(Exception e) { +174 LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '" +175 + snapshot.getName() + "' due to error", e); +176 try { +177 taskManager.cancelTasks(); +178 } catch (InterruptedException e1) { +179 Thread.currentThread().interrupt(); +180 } +181 } +182 +183 /** +184 * Hooray! +185 */ +186 public void releaseBarrier() { +187 // NO OP +188 } +189 +190} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6cafca90/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.html index 7575174..238f4d6 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.html @@ -38,158 +38,164 @@ 030import org.apache.hadoop.hbase.procedure.Subprocedure; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.Region; -033import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool; -034import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; -035import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; -036 -037/** -038 * This online snapshot implementation uses the distributed procedure framework to force a -039 * store flush and then records the hfiles. Its enter stage does nothing. Its leave stage then -040 * flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and -041 * copies .regioninfos into the snapshot working directory. At the master side, there is an atomic -042 * rename of the working dir into the proper snapshot directory. -043 */ -044@InterfaceAudience.Private -045@InterfaceStability.Unstable -046public class FlushSnapshotSubprocedure extends Subprocedure { -047 private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class); -048 -049 private final List<Region> regions; -050 private final SnapshotDescription snapshot; -051 private final SnapshotSubprocedurePool taskManager; -052 private boolean snapshotSkipFlush = false; -053 -054 public FlushSnapshotSubprocedure(ProcedureMember member, -055 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, -056 List<Region> regions, SnapshotDescription snapshot, -057 SnapshotSubprocedurePool taskManager) { -058 super(member, snapshot.getName(), errorListener, wakeFrequency, timeout); -059 this.snapshot = snapshot; -060 -061 if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) { -062 snapshotSkipFlush = true; -063 } -064 this.regions = regions; -065 this.taskManager = taskManager; -066 } -067 -068 /** -069 * Callable for adding files to snapshot manifest working dir. Ready for multithreading. -070 */ -071 private class RegionSnapshotTask implements Callable<Void> { -072 Region region; -073 RegionSnapshotTask(Region region) { -074 this.region = region; -075 } -076 -077 @Override -078 public Void call() throws Exception { -079 // Taking the region read lock prevents the individual region from being closed while a -080 // snapshot is in progress. This is helpful but not sufficient for preventing races with -081 // snapshots that involve multiple regions and regionservers. It is still possible to have -082 // an interleaving such that globally regions are missing, so we still need the verification -083 // step. -084 LOG.debug("Starting region operation on " + region); -085 region.startRegionOperation(); -086 try { -087 if (snapshotSkipFlush) { -088 /* -089 * This is to take an online-snapshot without force a coordinated flush to prevent pause -090 * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure -091 * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be -092 * turned on/off based on the flush type. -093 * To minimized the code change, class name is not changed. -094 */ -095 LOG.debug("take snapshot without flush memstore first"); -096 } else { -097 LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); -098 region.flush(true); -099 } -100 ((HRegion)region).addRegionToSnapshot(snapshot, monitor); -101 if (snapshotSkipFlush) { -102 LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed."); -103 } else { -104 LOG.debug("... Flush Snapshotting region " + region.toString() + " completed."); +033import org.apache.hadoop.hbase.regionserver.Region.FlushResult; +034import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool; +035import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; +036import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; +037 +038/** +039 * This online snapshot implementation uses the distributed procedure framework to force a +040 * store flush and then records the hfiles. Its enter stage does nothing. Its leave stage then +041 * flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and +042 * copies .regioninfos into the snapshot working directory. At the master side, there is an atomic +043 * rename of the working dir into the proper snapshot directory. +044 */ +045@InterfaceAudience.Private +046@InterfaceStability.Unstable +047public class FlushSnapshotSubprocedure extends Subprocedure { +048 private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class); +049 +050 private final List<Region> regions; +051 private final SnapshotDescription snapshot; +052 private final SnapshotSubprocedurePool taskManager; +053 private boolean snapshotSkipFlush = false; +054 +055 public FlushSnapshotSubprocedure(ProcedureMember member, +056 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, +057 List<Region> regions, SnapshotDescription snapshot, +058 SnapshotSubprocedurePool taskManager) { +059 super(member, snapshot.getName(), errorListener, wakeFrequency, timeout); +060 this.snapshot = snapshot; +061 +062 if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) { +063 snapshotSkipFlush = true; +064 } +065 this.regions = regions; +066 this.taskManager = taskManager; +067 } +068 +069 /** +070 * Callable for adding files to snapshot manifest working dir. Ready for multithreading. +071 */ +072 private class RegionSnapshotTask implements Callable<Void> { +073 Region region; +074 RegionSnapshotTask(Region region) { +075 this.region = region; +076 } +077 +078 @Override +079 public Void call() throws Exception { +080 // Taking the region read lock prevents the individual region from being closed while a +081 // snapshot is in progress. This is helpful but not sufficient for preventing races with +082 // snapshots that involve multiple regions and regionservers. It is still possible to have +083 // an interleaving such that globally regions are missing, so we still need the verification +084 // step. +085 LOG.debug("Starting region operation on " + region); +086 region.startRegionOperation(); +087 try { +088 if (snapshotSkipFlush) { +089 /* +090 * This is to take an online-snapshot without force a coordinated flush to prevent pause +091 * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure +092 * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be +093 * turned on/off based on the flush type. +094 * To minimized the code change, class name is not changed. +095 */ +096 LOG.debug("take snapshot without flush memstore first"); +097 } else { +098 LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); +099 FlushResult res = region.flush(true); +100 if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { +101 // CANNOT_FLUSH may mean that a flush is already on-going +102 // we need to wait for that flush to complete +103 region.waitForFlushes(); +104 } 105 } -106 } finally { -107 LOG.debug("Closing region operation on " + region); -108 region.closeRegionOperation(); -109 } -110 return null; -111 } -112 } -113 -114 private void flushSnapshot() throws ForeignException { -115 if (regions.isEmpty()) { -116 // No regions on this RS, we are basically done. -117 return; -118 } +106 ((HRegion)region).addRegionToSnapshot(snapshot, monitor); +107 if (snapshotSkipFlush) { +108 LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed."); +109 } else { +110 LOG.debug("... Flush Snapshotting region " + region.toString() + " completed."); +111 } +112 } finally { +113 LOG.debug("Closing region operation on " + region); +114 region.closeRegionOperation(); +115 } +116 return null; +117 } +118 } 119 -120 monitor.rethrowException(); -121 -122 // assert that the taskManager is empty. -123 if (taskManager.hasTasks()) { -124 throw new IllegalStateException("Attempting to take snapshot " -125 + ClientSnapshotDescriptionUtils.toString(snapshot) -126 + " but we currently have outstanding tasks"); -127 } -128 -129 // Add all hfiles already existing in region. -130 for (Region region : regions) { -131 // submit one task per region for parallelize by region. -132 taskManager.submitTask(new RegionSnapshotTask(region)); -133 monitor.rethrowException(); -134 } -135 -136 // wait for everything to complete. -137 LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions"); -138 try { -139 taskManager.waitForOutstandingTasks(); -140 } catch (InterruptedException e) { -141 LOG.error("got interrupted exception for " + getMemberName()); -142 throw new ForeignException(getMemberName(), e); -143 } -144 } -145 -146 /** -147 * do nothing, core of snapshot is executed in {@link #insideBarrier} step. -148 */ -149 @Override -150 public void acquireBarrier() throws ForeignException { -151 // NO OP -152 } -153 -154 /** -155 * do a flush snapshot of every region on this rs from the target table. -156 */ -157 @Override -158 public byte[] insideBarrier() throws ForeignException { -159 flushSnapshot(); -160 return new byte[0]; -161 } -162 -163 /** -164 * Cancel threads if they haven't finished. -165 */ -166 @Override -167 public void cleanup(Exception e) { -168 LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '" -169 + snapshot.getName() + "' due to error", e); -170 try { -171 taskManager.cancelTasks(); -172 } catch (InterruptedException e1) { -173 Thread.currentThread().interrupt(); -174 } -175 } -176 -177 /** -178 * Hooray! -179 */ -180 public void releaseBarrier() { -181 // NO OP -182 } -183 -184} +120 private void flushSnapshot() throws ForeignException { +121 if (regions.isEmpty()) { +122 // No regions on this RS, we are basically done. +123 return; +124 } +125 +126 monitor.rethrowException(); +127 +128 // assert that the taskManager is empty. +129 if (taskManager.hasTasks()) { +130 throw new IllegalStateException("Attempting to take snapshot " +131 + ClientSnapshotDescriptionUtils.toString(snapshot) +132 + " but we currently have outstanding tasks"); +133 } +134 +135 // Add all hfiles already existing in region. +136 for (Region region : regions) { +137 // submit one task per region for parallelize by region. +138 taskManager.submitTask(new RegionSnapshotTask(region)); +139 monitor.rethrowException(); +140 } +141 +142 // wait for everything to complete. +143 LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions"); +144 try { +145 taskManager.waitForOutstandingTasks(); +146 } catch (InterruptedException e) { +147 LOG.error("got interrupted exception for " + getMemberName()); +148 throw new ForeignException(getMemberName(), e); +149 } +150 } +151 +152 /** +153 * do nothing, core of snapshot is executed in {@link #insideBarrier} step. +154 */ +155 @Override +156 public void acquireBarrier() throws ForeignException { +157 // NO OP +158 } +159 +160 /** +161 * do a flush snapshot of every region on this rs from the target table. +162 */ +163 @Override +164 public byte[] insideBarrier() throws ForeignException { +165 flushSnapshot(); +166 return new byte[0]; +167 } +168 +169 /** +170 * Cancel threads if they haven't finished. +171 */ +172 @Override +173 public void cleanup(Exception e) { +174 LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '" +175 + snapshot.getName() + "' due to error", e); +176 try { +177 taskManager.cancelTasks(); +178 } catch (InterruptedException e1) { +179 Thread.currentThread().interrupt(); +180 } +181 } +182 +183 /** +184 * Hooray! +185 */ +186 public void releaseBarrier() { +187 // NO OP +188 } +189 +190}