Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C6597F8C8 for ; Sat, 6 Apr 2013 06:08:13 +0000 (UTC) Received: (qmail 52202 invoked by uid 500); 6 Apr 2013 06:08:13 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 52143 invoked by uid 500); 6 Apr 2013 06:08:13 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 52136 invoked by uid 99); 6 Apr 2013 06:08:12 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Apr 2013 06:08:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Apr 2013 06:07:51 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3D45B2388C9E; Sat, 6 Apr 2013 06:06:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1465198 [32/41] - in /hbase/hbase.apache.org/trunk: ./ css/ hbase-assembly/ images/ xref-test/ xref-test/org/apache/hadoop/hbase/ xref-test/org/apache/hadoop/hbase/client/ xref-test/org/apache/hadoop/hbase/client/replication/ xref-test/org... Date: Sat, 06 Apr 2013 06:06:09 -0000 To: commits@hbase.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130406060616.3D45B2388C9E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/regionserver/Replication.html URL: http://svn.apache.org/viewvc/hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/regionserver/Replication.html?rev=1465198&r1=1465197&r2=1465198&view=diff ============================================================================== --- hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/regionserver/Replication.html (original) +++ hbase/hbase.apache.org/trunk/xref/org/apache/hadoop/hbase/replication/regionserver/Replication.html Sat Apr 6 06:06:07 2013 @@ -53,246 +53,256 @@ 43 import org.apache.hadoop.hbase.regionserver.wal.HLogKey; 44 import org.apache.hadoop.hbase.regionserver.wal.WALEdit; 45 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -46 import org.apache.hadoop.hbase.replication.ReplicationZookeeper; -47 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; -48 import org.apache.hadoop.hbase.util.Bytes; -49 import org.apache.zookeeper.KeeperException; -50 -51 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; -52 import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY; -53 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; -54 -55 /** -56 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. -57 */ -58 @InterfaceAudience.Private -59 public class Replication implements WALActionsListener, -60 ReplicationSourceService, ReplicationSinkService { -61 private static final Log LOG = -62 LogFactory.getLog(Replication.class); -63 private boolean replication; -64 private ReplicationSourceManager replicationManager; -65 private final AtomicBoolean replicating = new AtomicBoolean(true); -66 private ReplicationZookeeper zkHelper; -67 private Configuration conf; -68 private ReplicationSink replicationSink; -69 // Hosting server -70 private Server server; -71 /** Statistics thread schedule pool */ -72 private ScheduledExecutorService scheduleThreadPool; -73 private int statsThreadPeriod; -74 -75 /** -76 * Instantiate the replication management (if rep is enabled). -77 * @param server Hosting server -78 * @param fs handle to the filesystem -79 * @param logDir -80 * @param oldLogDir directory where logs are archived -81 * @throws IOException -82 */ -83 public Replication(final Server server, final FileSystem fs, -84 final Path logDir, final Path oldLogDir) throws IOException{ -85 initialize(server, fs, logDir, oldLogDir); -86 } -87 -88 /** -89 * Empty constructor -90 */ -91 public Replication() { -92 } -93 -94 public void initialize(final Server server, final FileSystem fs, -95 final Path logDir, final Path oldLogDir) throws IOException { -96 this.server = server; -97 this.conf = this.server.getConfiguration(); -98 this.replication = isReplication(this.conf); -99 this.scheduleThreadPool = Executors.newScheduledThreadPool(1, -100 new ThreadFactoryBuilder() -101 .setNameFormat(server.getServerName() + "Replication Statistics #%d") -102 .setDaemon(true) -103 .build()); -104 if (replication) { -105 try { -106 this.zkHelper = new ReplicationZookeeper(server, this.replicating); -107 } catch (KeeperException ke) { -108 throw new IOException("Failed replication handler create " + -109 "(replicating=" + this.replicating, ke); -110 } -111 this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, -112 this.replicating, logDir, oldLogDir); -113 this.statsThreadPeriod = -114 this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); -115 LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); -116 } else { -117 this.replicationManager = null; -118 this.zkHelper = null; -119 } -120 } -121 -122 /** -123 * @param c Configuration to look at -124 * @return True if replication is enabled. -125 */ -126 public static boolean isReplication(final Configuration c) { -127 return c.getBoolean(REPLICATION_ENABLE_KEY, false); +46 import org.apache.hadoop.hbase.replication.ReplicationQueues; +47 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; +48 import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +49 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +50 import org.apache.hadoop.hbase.util.Bytes; +51 import org.apache.zookeeper.KeeperException; +52 +53 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; +54 import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY; +55 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; +56 +57 /** +58 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. +59 */ +60 @InterfaceAudience.Private +61 public class Replication implements WALActionsListener, +62 ReplicationSourceService, ReplicationSinkService { +63 private static final Log LOG = +64 LogFactory.getLog(Replication.class); +65 private boolean replication; +66 private ReplicationSourceManager replicationManager; +67 private final AtomicBoolean replicating = new AtomicBoolean(true); +68 private ReplicationZookeeper zkHelper; +69 private ReplicationQueues replicationQueues; +70 private Configuration conf; +71 private ReplicationSink replicationSink; +72 // Hosting server +73 private Server server; +74 /** Statistics thread schedule pool */ +75 private ScheduledExecutorService scheduleThreadPool; +76 private int statsThreadPeriod; +77 +78 /** +79 * Instantiate the replication management (if rep is enabled). +80 * @param server Hosting server +81 * @param fs handle to the filesystem +82 * @param logDir +83 * @param oldLogDir directory where logs are archived +84 * @throws IOException +85 */ +86 public Replication(final Server server, final FileSystem fs, +87 final Path logDir, final Path oldLogDir) throws IOException{ +88 initialize(server, fs, logDir, oldLogDir); +89 } +90 +91 /** +92 * Empty constructor +93 */ +94 public Replication() { +95 } +96 +97 public void initialize(final Server server, final FileSystem fs, +98 final Path logDir, final Path oldLogDir) throws IOException { +99 this.server = server; +100 this.conf = this.server.getConfiguration(); +101 this.replication = isReplication(this.conf); +102 this.scheduleThreadPool = Executors.newScheduledThreadPool(1, +103 new ThreadFactoryBuilder() +104 .setNameFormat(server.getServerName() + "Replication Statistics #%d") +105 .setDaemon(true) +106 .build()); +107 if (replication) { +108 try { +109 this.zkHelper = new ReplicationZookeeper(server, this.replicating); +110 this.replicationQueues = +111 new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server); +112 this.replicationQueues.init(this.server.getServerName().toString()); +113 } catch (KeeperException ke) { +114 throw new IOException("Failed replication handler create " + +115 "(replicating=" + this.replicating, ke); +116 } +117 this.replicationManager = +118 new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, +119 this.replicating, logDir, oldLogDir); +120 this.statsThreadPeriod = +121 this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); +122 LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); +123 } else { +124 this.replicationManager = null; +125 this.zkHelper = null; +126 this.replicationQueues = null; +127 } 128 } 129 -130 /* -131 * Returns an object to listen to new hlog changes -132 **/ -133 public WALActionsListener getWALActionsListener() { -134 return this; -135 } -136 /** -137 * Stops replication service. -138 */ -139 public void stopReplicationService() { -140 join(); -141 } -142 -143 /** -144 * Join with the replication threads -145 */ -146 public void join() { -147 if (this.replication) { -148 this.replicationManager.join(); -149 this.replicationSink.stopReplicationSinkServices(); -150 } -151 } -152 -153 /** -154 * Carry on the list of log entries down to the sink -155 * @param entries list of entries to replicate -156 * @throws IOException -157 */ -158 public void replicateLogEntries(HLog.Entry[] entries) throws IOException { -159 if (this.replication) { -160 this.replicationSink.replicateEntries(entries); -161 } -162 } -163 -164 /** -165 * If replication is enabled and this cluster is a master, -166 * it starts -167 * @throws IOException -168 */ -169 public void startReplicationService() throws IOException { -170 if (this.replication) { -171 this.replicationManager.init(); -172 this.replicationSink = new ReplicationSink(this.conf, this.server); -173 this.scheduleThreadPool.scheduleAtFixedRate( -174 new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), -175 statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); -176 } -177 } -178 -179 /** -180 * Get the replication sources manager -181 * @return the manager if replication is enabled, else returns false -182 */ -183 public ReplicationSourceManager getReplicationManager() { -184 return this.replicationManager; -185 } -186 -187 @Override -188 public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, -189 WALEdit logEdit) { -190 // Not interested -191 } -192 -193 @Override -194 public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, -195 WALEdit logEdit) { -196 NavigableMap<byte[], Integer> scopes = -197 new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); -198 byte[] family; -199 for (KeyValue kv : logEdit.getKeyValues()) { -200 family = kv.getFamily(); -201 int scope = htd.getFamily(family).getScope(); -202 if (scope != REPLICATION_SCOPE_LOCAL && -203 !scopes.containsKey(family)) { -204 scopes.put(family, scope); -205 } -206 } -207 if (!scopes.isEmpty()) { -208 logEdit.setScopes(scopes); -209 } -210 } -211 -212 @Override -213 public void preLogRoll(Path oldPath, Path newPath) throws IOException { -214 getReplicationManager().preLogRoll(newPath); -215 } -216 -217 @Override -218 public void postLogRoll(Path oldPath, Path newPath) throws IOException { -219 getReplicationManager().postLogRoll(newPath); +130 /** +131 * @param c Configuration to look at +132 * @return True if replication is enabled. +133 */ +134 public static boolean isReplication(final Configuration c) { +135 return c.getBoolean(REPLICATION_ENABLE_KEY, false); +136 } +137 +138 /* +139 * Returns an object to listen to new hlog changes +140 **/ +141 public WALActionsListener getWALActionsListener() { +142 return this; +143 } +144 /** +145 * Stops replication service. +146 */ +147 public void stopReplicationService() { +148 join(); +149 } +150 +151 /** +152 * Join with the replication threads +153 */ +154 public void join() { +155 if (this.replication) { +156 this.replicationManager.join(); +157 if (this.replicationSink != null) { +158 this.replicationSink.stopReplicationSinkServices(); +159 } +160 } +161 } +162 +163 /** +164 * Carry on the list of log entries down to the sink +165 * @param entries list of entries to replicate +166 * @throws IOException +167 */ +168 public void replicateLogEntries(HLog.Entry[] entries) throws IOException { +169 if (this.replication) { +170 this.replicationSink.replicateEntries(entries); +171 } +172 } +173 +174 /** +175 * If replication is enabled and this cluster is a master, +176 * it starts +177 * @throws IOException +178 */ +179 public void startReplicationService() throws IOException { +180 if (this.replication) { +181 this.replicationManager.init(); +182 this.replicationSink = new ReplicationSink(this.conf, this.server); +183 this.scheduleThreadPool.scheduleAtFixedRate( +184 new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), +185 statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); +186 } +187 } +188 +189 /** +190 * Get the replication sources manager +191 * @return the manager if replication is enabled, else returns false +192 */ +193 public ReplicationSourceManager getReplicationManager() { +194 return this.replicationManager; +195 } +196 +197 @Override +198 public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, +199 WALEdit logEdit) { +200 // Not interested +201 } +202 +203 @Override +204 public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, +205 WALEdit logEdit) { +206 NavigableMap<byte[], Integer> scopes = +207 new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); +208 byte[] family; +209 for (KeyValue kv : logEdit.getKeyValues()) { +210 family = kv.getFamily(); +211 int scope = htd.getFamily(family).getScope(); +212 if (scope != REPLICATION_SCOPE_LOCAL && +213 !scopes.containsKey(family)) { +214 scopes.put(family, scope); +215 } +216 } +217 if (!scopes.isEmpty()) { +218 logEdit.setScopes(scopes); +219 } 220 } 221 222 @Override -223 public void preLogArchive(Path oldPath, Path newPath) throws IOException { -224 // Not interested +223 public void preLogRoll(Path oldPath, Path newPath) throws IOException { +224 getReplicationManager().preLogRoll(newPath); 225 } 226 227 @Override -228 public void postLogArchive(Path oldPath, Path newPath) throws IOException { -229 // Not interested +228 public void postLogRoll(Path oldPath, Path newPath) throws IOException { +229 getReplicationManager().postLogRoll(newPath); 230 } 231 -232 /** -233 * This method modifies the master's configuration in order to inject -234 * replication-related features -235 * @param conf -236 */ -237 public static void decorateMasterConfiguration(Configuration conf) { -238 if (!isReplication(conf)) { -239 return; -240 } -241 String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS); -242 String cleanerClass = ReplicationLogCleaner.class.getCanonicalName(); -243 if (!plugins.contains(cleanerClass)) { -244 conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass); -245 } -246 } -247 -248 @Override -249 public void logRollRequested() { -250 // Not interested -251 } -252 -253 @Override -254 public void logCloseRequested() { -255 // not interested +232 @Override +233 public void preLogArchive(Path oldPath, Path newPath) throws IOException { +234 // Not interested +235 } +236 +237 @Override +238 public void postLogArchive(Path oldPath, Path newPath) throws IOException { +239 // Not interested +240 } +241 +242 /** +243 * This method modifies the master's configuration in order to inject +244 * replication-related features +245 * @param conf +246 */ +247 public static void decorateMasterConfiguration(Configuration conf) { +248 if (!isReplication(conf)) { +249 return; +250 } +251 String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS); +252 String cleanerClass = ReplicationLogCleaner.class.getCanonicalName(); +253 if (!plugins.contains(cleanerClass)) { +254 conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass); +255 } 256 } 257 -258 /* -259 * Statistics thread. Periodically prints the cache statistics to the log. -260 */ -261 static class ReplicationStatisticsThread extends Thread { +258 @Override +259 public void logRollRequested() { +260 // Not interested +261 } 262 -263 private final ReplicationSink replicationSink; -264 private final ReplicationSourceManager replicationManager; -265 -266 public ReplicationStatisticsThread(final ReplicationSink replicationSink, -267 final ReplicationSourceManager replicationManager) { -268 super("ReplicationStatisticsThread"); -269 this.replicationManager = replicationManager; -270 this.replicationSink = replicationSink; -271 } +263 @Override +264 public void logCloseRequested() { +265 // not interested +266 } +267 +268 /* +269 * Statistics thread. Periodically prints the cache statistics to the log. +270 */ +271 static class ReplicationStatisticsThread extends Thread { 272 -273 @Override -274 public void run() { -275 printStats(this.replicationManager.getStats()); -276 printStats(this.replicationSink.getStats()); -277 } -278 -279 private void printStats(String stats) { -280 if (!stats.isEmpty()) { -281 LOG.info(stats); -282 } -283 } -284 } -285 } +273 private final ReplicationSink replicationSink; +274 private final ReplicationSourceManager replicationManager; +275 +276 public ReplicationStatisticsThread(final ReplicationSink replicationSink, +277 final ReplicationSourceManager replicationManager) { +278 super("ReplicationStatisticsThread"); +279 this.replicationManager = replicationManager; +280 this.replicationSink = replicationSink; +281 } +282 +283 @Override +284 public void run() { +285 printStats(this.replicationManager.getStats()); +286 printStats(this.replicationSink.getStats()); +287 } +288 +289 private void printStats(String stats) { +290 if (!stats.isEmpty()) { +291 LOG.info(stats); +292 } +293 } +294 } +295 }