Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 656F6200C48 for ; Thu, 6 Apr 2017 16:30:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6423B160B84; Thu, 6 Apr 2017 14:30:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B0A9E160BA9 for ; Thu, 6 Apr 2017 16:30:19 +0200 (CEST) Received: (qmail 89316 invoked by uid 500); 6 Apr 2017 14:30:18 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 89070 invoked by uid 99); 6 Apr 2017 14:30:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Apr 2017 14:30:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ED7A5E8F01; Thu, 6 Apr 2017 14:30:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 06 Apr 2017 14:30:20 -0000 Message-Id: <41395e3ed6144da49783861f59f8f090@git.apache.org> In-Reply-To: <84d7f55c288b41dfa8f6065d19392563@git.apache.org> References: <84d7f55c288b41dfa8f6065d19392563@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/9] hbase-site git commit: Published site at ec5188df3090d42088b6f4cb8f0c2fd49425f8c1. archived-at: Thu, 06 Apr 2017 14:30:22 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/9a2dcf32/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.Verifier.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.Verifier.html b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.Verifier.html index d7eae79..3492ff5 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.Verifier.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.Verifier.html @@ -85,7 +85,7 @@ 077 private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; 078 static long startTime = 0; 079 static long endTime = Long.MAX_VALUE; -080 static int batch = Integer.MAX_VALUE; +080 static int batch = -1; 081 static int versions = -1; 082 static String tableName = null; 083 static String families = null; @@ -118,464 +118,475 @@ 110 private int sleepMsBeforeReCompare; 111 private String delimiter = ""; 112 private boolean verbose = false; -113 -114 /** -115 * Map method that compares every scanned row with the equivalent from -116 * a distant cluster. -117 * @param row The current table row key. -118 * @param value The columns. -119 * @param context The current context. -120 * @throws IOException When something is broken with the data. -121 */ -122 @Override -123 public void map(ImmutableBytesWritable row, final Result value, -124 Context context) -125 throws IOException { -126 if (replicatedScanner == null) { -127 Configuration conf = context.getConfiguration(); -128 sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); -129 delimiter = conf.get(NAME + ".delimiter", ""); -130 verbose = conf.getBoolean(NAME +".verbose", false); -131 final Scan scan = new Scan(); -132 scan.setBatch(batch); -133 scan.setCacheBlocks(false); -134 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); -135 long startTime = conf.getLong(NAME + ".startTime", 0); -136 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); -137 String families = conf.get(NAME + ".families", null); -138 if(families != null) { -139 String[] fams = families.split(","); -140 for(String fam : fams) { -141 scan.addFamily(Bytes.toBytes(fam)); -142 } -143 } -144 boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); -145 scan.setRaw(includeDeletedCells); -146 String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); -147 setRowPrefixFilter(scan, rowPrefixes); -148 scan.setTimeRange(startTime, endTime); -149 int versions = conf.getInt(NAME+".versions", -1); -150 LOG.info("Setting number of version inside map as: " + versions); -151 if (versions >= 0) { -152 scan.setMaxVersions(versions); -153 } -154 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); -155 sourceConnection = ConnectionFactory.createConnection(conf); -156 sourceTable = sourceConnection.getTable(tableName); -157 -158 final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); -159 -160 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); -161 Configuration peerConf = HBaseConfiguration.createClusterConf(conf, -162 zkClusterKey, PEER_CONFIG_PREFIX); +113 private int batch = -1; +114 +115 /** +116 * Map method that compares every scanned row with the equivalent from +117 * a distant cluster. +118 * @param row The current table row key. +119 * @param value The columns. +120 * @param context The current context. +121 * @throws IOException When something is broken with the data. +122 */ +123 @Override +124 public void map(ImmutableBytesWritable row, final Result value, +125 Context context) +126 throws IOException { +127 if (replicatedScanner == null) { +128 Configuration conf = context.getConfiguration(); +129 sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); +130 delimiter = conf.get(NAME + ".delimiter", ""); +131 verbose = conf.getBoolean(NAME +".verbose", false); +132 batch = conf.getInt(NAME + ".batch", -1); +133 final Scan scan = new Scan(); +134 if (batch > 0) { +135 scan.setBatch(batch); +136 } +137 scan.setCacheBlocks(false); +138 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); +139 long startTime = conf.getLong(NAME + ".startTime", 0); +140 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); +141 String families = conf.get(NAME + ".families", null); +142 if(families != null) { +143 String[] fams = families.split(","); +144 for(String fam : fams) { +145 scan.addFamily(Bytes.toBytes(fam)); +146 } +147 } +148 boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); +149 scan.setRaw(includeDeletedCells); +150 String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); +151 setRowPrefixFilter(scan, rowPrefixes); +152 scan.setTimeRange(startTime, endTime); +153 int versions = conf.getInt(NAME+".versions", -1); +154 LOG.info("Setting number of version inside map as: " + versions); +155 if (versions >= 0) { +156 scan.setMaxVersions(versions); +157 } +158 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); +159 sourceConnection = ConnectionFactory.createConnection(conf); +160 sourceTable = sourceConnection.getTable(tableName); +161 +162 final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); 163 -164 replicatedConnection = ConnectionFactory.createConnection(peerConf); -165 replicatedTable = replicatedConnection.getTable(tableName); -166 scan.setStartRow(value.getRow()); -167 scan.setStopRow(tableSplit.getEndRow()); -168 replicatedScanner = replicatedTable.getScanner(scan); -169 currentCompareRowInPeerTable = replicatedScanner.next(); -170 } -171 while (true) { -172 if (currentCompareRowInPeerTable == null) { -173 // reach the region end of peer table, row only in source table -174 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); -175 break; -176 } -177 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); -178 if (rowCmpRet == 0) { -179 // rowkey is same, need to compare the content of the row -180 try { -181 Result.compareResults(value, currentCompareRowInPeerTable); -182 context.getCounter(Counters.GOODROWS).increment(1); -183 if (verbose) { -184 LOG.info("Good row key: " + delimiter -185 + Bytes.toStringBinary(value.getRow()) + delimiter); -186 } -187 } catch (Exception e) { -188 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); -189 } -190 currentCompareRowInPeerTable = replicatedScanner.next(); -191 break; -192 } else if (rowCmpRet < 0) { -193 // row only exists in source table -194 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); +164 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); +165 Configuration peerConf = HBaseConfiguration.createClusterConf(conf, +166 zkClusterKey, PEER_CONFIG_PREFIX); +167 +168 replicatedConnection = ConnectionFactory.createConnection(peerConf); +169 replicatedTable = replicatedConnection.getTable(tableName); +170 scan.setStartRow(value.getRow()); +171 scan.setStopRow(tableSplit.getEndRow()); +172 replicatedScanner = replicatedTable.getScanner(scan); +173 currentCompareRowInPeerTable = replicatedScanner.next(); +174 } +175 while (true) { +176 if (currentCompareRowInPeerTable == null) { +177 // reach the region end of peer table, row only in source table +178 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); +179 break; +180 } +181 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); +182 if (rowCmpRet == 0) { +183 // rowkey is same, need to compare the content of the row +184 try { +185 Result.compareResults(value, currentCompareRowInPeerTable); +186 context.getCounter(Counters.GOODROWS).increment(1); +187 if (verbose) { +188 LOG.info("Good row key: " + delimiter +189 + Bytes.toStringBinary(value.getRow()) + delimiter); +190 } +191 } catch (Exception e) { +192 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); +193 } +194 currentCompareRowInPeerTable = replicatedScanner.next(); 195 break; -196 } else { -197 // row only exists in peer table -198 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, -199 currentCompareRowInPeerTable); -200 currentCompareRowInPeerTable = replicatedScanner.next(); -201 } -202 } -203 } -204 -205 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { -206 if (sleepMsBeforeReCompare > 0) { -207 Threads.sleep(sleepMsBeforeReCompare); -208 try { -209 Result sourceResult = sourceTable.get(new Get(row.getRow())); -210 Result replicatedResult = replicatedTable.get(new Get(row.getRow())); -211 Result.compareResults(sourceResult, replicatedResult); -212 if (!sourceResult.isEmpty()) { -213 context.getCounter(Counters.GOODROWS).increment(1); -214 if (verbose) { -215 LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow()) -216 + delimiter); -217 } -218 } -219 return; -220 } catch (Exception e) { -221 LOG.error("recompare fail after sleep, rowkey=" + delimiter + -222 Bytes.toStringBinary(row.getRow()) + delimiter); -223 } -224 } -225 context.getCounter(counter).increment(1); -226 context.getCounter(Counters.BADROWS).increment(1); -227 LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + -228 delimiter); -229 } -230 -231 @Override -232 protected void cleanup(Context context) { -233 if (replicatedScanner != null) { -234 try { -235 while (currentCompareRowInPeerTable != null) { -236 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, -237 currentCompareRowInPeerTable); -238 currentCompareRowInPeerTable = replicatedScanner.next(); -239 } -240 } catch (Exception e) { -241 LOG.error("fail to scan peer table in cleanup", e); -242 } finally { -243 replicatedScanner.close(); -244 replicatedScanner = null; -245 } -246 } -247 -248 if (sourceTable != null) { -249 try { -250 sourceTable.close(); -251 } catch (IOException e) { -252 LOG.error("fail to close source table in cleanup", e); -253 } -254 } -255 if(sourceConnection != null){ -256 try { -257 sourceConnection.close(); -258 } catch (Exception e) { -259 LOG.error("fail to close source connection in cleanup", e); -260 } -261 } -262 -263 if(replicatedTable != null){ -264 try{ -265 replicatedTable.close(); -266 } catch (Exception e) { -267 LOG.error("fail to close replicated table in cleanup", e); -268 } -269 } -270 if(replicatedConnection != null){ -271 try { -272 replicatedConnection.close(); -273 } catch (Exception e) { -274 LOG.error("fail to close replicated connection in cleanup", e); -275 } -276 } -277 } -278 } -279 -280 private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig( -281 final Configuration conf) throws IOException { -282 ZooKeeperWatcher localZKW = null; -283 ReplicationPeerZKImpl peer = null; -284 try { -285 localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", -286 new Abortable() { -287 @Override public void abort(String why, Throwable e) {} -288 @Override public boolean isAborted() {return false;} -289 }); -290 -291 ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); -292 rp.init(); -293 -294 Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId); -295 if (pair == null) { -296 throw new IOException("Couldn't get peer conf!"); -297 } -298 -299 return pair; -300 } catch (ReplicationException e) { -301 throw new IOException( -302 "An error occurred while trying to connect to the remove peer cluster", e); -303 } finally { -304 if (peer != null) { -305 peer.close(); -306 } -307 if (localZKW != null) { -308 localZKW.close(); -309 } -310 } -311 } -312 -313 /** -314 * Sets up the actual job. -315 * -316 * @param conf The current configuration. -317 * @param args The command line parameters. -318 * @return The newly created job. -319 * @throws java.io.IOException When setting up the job fails. -320 */ -321 public static Job createSubmittableJob(Configuration conf, String[] args) -322 throws IOException { -323 if (!doCommandLine(args)) { -324 return null; -325 } -326 conf.set(NAME+".peerId", peerId); -327 conf.set(NAME+".tableName", tableName); -328 conf.setLong(NAME+".startTime", startTime); -329 conf.setLong(NAME+".endTime", endTime); -330 conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare); -331 conf.set(NAME + ".delimiter", delimiter); -332 conf.setBoolean(NAME +".verbose", verbose); -333 conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells); -334 if (families != null) { -335 conf.set(NAME+".families", families); -336 } -337 if (rowPrefixes != null){ -338 conf.set(NAME+".rowPrefixes", rowPrefixes); -339 } -340 -341 Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf); -342 ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); -343 String peerQuorumAddress = peerConfig.getClusterKey(); -344 LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + -345 peerConfig.getConfiguration()); -346 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); -347 HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, -348 peerConfig.getConfiguration().entrySet()); -349 -350 conf.setInt(NAME + ".versions", versions); -351 LOG.info("Number of version: " + versions); -352 -353 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); -354 job.setJarByClass(VerifyReplication.class); -355 -356 Scan scan = new Scan(); -357 scan.setTimeRange(startTime, endTime); -358 scan.setRaw(includeDeletedCells); -359 if (versions >= 0) { -360 scan.setMaxVersions(versions); -361 LOG.info("Number of versions set to " + versions); -362 } -363 if(families != null) { -364 String[] fams = families.split(","); -365 for(String fam : fams) { -366 scan.addFamily(Bytes.toBytes(fam)); -367 } -368 } -369 -370 setRowPrefixFilter(scan, rowPrefixes); -371 -372 TableMapReduceUtil.initTableMapperJob(tableName, scan, -373 Verifier.class, null, null, job); -374 -375 Configuration peerClusterConf = peerConfigPair.getSecond(); -376 // Obtain the auth token from peer cluster -377 TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); +196 } else if (rowCmpRet < 0) { +197 // row only exists in source table +198 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); +199 break; +200 } else { +201 // row only exists in peer table +202 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, +203 currentCompareRowInPeerTable); +204 currentCompareRowInPeerTable = replicatedScanner.next(); +205 } +206 } +207 } +208 +209 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { +210 if (sleepMsBeforeReCompare > 0) { +211 Threads.sleep(sleepMsBeforeReCompare); +212 try { +213 Result sourceResult = sourceTable.get(new Get(row.getRow())); +214 Result replicatedResult = replicatedTable.get(new Get(row.getRow())); +215 Result.compareResults(sourceResult, replicatedResult); +216 if (!sourceResult.isEmpty()) { +217 context.getCounter(Counters.GOODROWS).increment(1); +218 if (verbose) { +219 LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow()) +220 + delimiter); +221 } +222 } +223 return; +224 } catch (Exception e) { +225 LOG.error("recompare fail after sleep, rowkey=" + delimiter + +226 Bytes.toStringBinary(row.getRow()) + delimiter); +227 } +228 } +229 context.getCounter(counter).increment(1); +230 context.getCounter(Counters.BADROWS).increment(1); +231 LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + +232 delimiter); +233 } +234 +235 @Override +236 protected void cleanup(Context context) { +237 if (replicatedScanner != null) { +238 try { +239 while (currentCompareRowInPeerTable != null) { +240 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, +241 currentCompareRowInPeerTable); +242 currentCompareRowInPeerTable = replicatedScanner.next(); +243 } +244 } catch (Exception e) { +245 LOG.error("fail to scan peer table in cleanup", e); +246 } finally { +247 replicatedScanner.close(); +248 replicatedScanner = null; +249 } +250 } +251 +252 if (sourceTable != null) { +253 try { +254 sourceTable.close(); +255 } catch (IOException e) { +256 LOG.error("fail to close source table in cleanup", e); +257 } +258 } +259 if(sourceConnection != null){ +260 try { +261 sourceConnection.close(); +262 } catch (Exception e) { +263 LOG.error("fail to close source connection in cleanup", e); +264 } +265 } +266 +267 if(replicatedTable != null){ +268 try{ +269 replicatedTable.close(); +270 } catch (Exception e) { +271 LOG.error("fail to close replicated table in cleanup", e); +272 } +273 } +274 if(replicatedConnection != null){ +275 try { +276 replicatedConnection.close(); +277 } catch (Exception e) { +278 LOG.error("fail to close replicated connection in cleanup", e); +279 } +280 } +281 } +282 } +283 +284 private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig( +285 final Configuration conf) throws IOException { +286 ZooKeeperWatcher localZKW = null; +287 ReplicationPeerZKImpl peer = null; +288 try { +289 localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", +290 new Abortable() { +291 @Override public void abort(String why, Throwable e) {} +292 @Override public boolean isAborted() {return false;} +293 }); +294 +295 ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); +296 rp.init(); +297 +298 Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId); +299 if (pair == null) { +300 throw new IOException("Couldn't get peer conf!"); +301 } +302 +303 return pair; +304 } catch (ReplicationException e) { +305 throw new IOException( +306 "An error occurred while trying to connect to the remove peer cluster", e); +307 } finally { +308 if (peer != null) { +309 peer.close(); +310 } +311 if (localZKW != null) { +312 localZKW.close(); +313 } +314 } +315 } +316 +317 /** +318 * Sets up the actual job. +319 * +320 * @param conf The current configuration. +321 * @param args The command line parameters. +322 * @return The newly created job. +323 * @throws java.io.IOException When setting up the job fails. +324 */ +325 public static Job createSubmittableJob(Configuration conf, String[] args) +326 throws IOException { +327 if (!doCommandLine(args)) { +328 return null; +329 } +330 conf.set(NAME+".peerId", peerId); +331 conf.set(NAME+".tableName", tableName); +332 conf.setLong(NAME+".startTime", startTime); +333 conf.setLong(NAME+".endTime", endTime); +334 conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare); +335 conf.set(NAME + ".delimiter", delimiter); +336 conf.setInt(NAME + ".batch", batch); +337 conf.setBoolean(NAME +".verbose", verbose); +338 conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells); +339 if (families != null) { +340 conf.set(NAME+".families", families); +341 } +342 if (rowPrefixes != null){ +343 conf.set(NAME+".rowPrefixes", rowPrefixes); +344 } +345 +346 Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf); +347 ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); +348 String peerQuorumAddress = peerConfig.getClusterKey(); +349 LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + +350 peerConfig.getConfiguration()); +351 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); +352 HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, +353 peerConfig.getConfiguration().entrySet()); +354 +355 conf.setInt(NAME + ".versions", versions); +356 LOG.info("Number of version: " + versions); +357 +358 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); +359 job.setJarByClass(VerifyReplication.class); +360 +361 Scan scan = new Scan(); +362 scan.setTimeRange(startTime, endTime); +363 scan.setRaw(includeDeletedCells); +364 scan.setCacheBlocks(false); +365 if (batch > 0) { +366 scan.setBatch(batch); +367 } +368 if (versions >= 0) { +369 scan.setMaxVersions(versions); +370 LOG.info("Number of versions set to " + versions); +371 } +372 if(families != null) { +373 String[] fams = families.split(","); +374 for(String fam : fams) { +375 scan.addFamily(Bytes.toBytes(fam)); +376 } +377 } 378 -379 job.setOutputFormatClass(NullOutputFormat.class); -380 job.setNumReduceTasks(0); -381 return job; -382 } +379 setRowPrefixFilter(scan, rowPrefixes); +380 +381 TableMapReduceUtil.initTableMapperJob(tableName, scan, +382 Verifier.class, null, null, job); 383 -384 private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { -385 if (rowPrefixes != null && !rowPrefixes.isEmpty()) { -386 String[] rowPrefixArray = rowPrefixes.split(","); -387 Arrays.sort(rowPrefixArray); -388 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); -389 for (String prefix : rowPrefixArray) { -390 Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); -391 filterList.addFilter(filter); -392 } -393 scan.setFilter(filterList); -394 byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); -395 byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]); -396 setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); -397 } -398 } -399 -400 private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { -401 scan.setStartRow(startPrefixRow); -402 byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), -403 new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)}); -404 scan.setStopRow(stopRow); -405 } -406 -407 private static boolean doCommandLine(final String[] args) { -408 if (args.length < 2) { -409 printUsage(null); -410 return false; -411 } -412 //in case we've been run before, restore all parameters to their initial states -413 //Otherwise, if our previous run included a parameter not in args this time, -414 //we might hold on to the old value. -415 restoreDefaults(); -416 try { -417 for (int i = 0; i < args.length; i++) { -418 String cmd = args[i]; -419 if (cmd.equals("-h") || cmd.startsWith("--h")) { -420 printUsage(null); -421 return false; -422 } -423 -424 final String startTimeArgKey = "--starttime="; -425 if (cmd.startsWith(startTimeArgKey)) { -426 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); -427 continue; -428 } -429 -430 final String endTimeArgKey = "--endtime="; -431 if (cmd.startsWith(endTimeArgKey)) { -432 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); -433 continue; -434 } -435 -436 final String includeDeletedCellsArgKey = "--raw"; -437 if (cmd.equals(includeDeletedCellsArgKey)) { -438 includeDeletedCells = true; -439 continue; -440 } -441 -442 final String versionsArgKey = "--versions="; -443 if (cmd.startsWith(versionsArgKey)) { -444 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); -445 continue; -446 } -447 -448 final String batchArgKey = "--batch="; -449 if (cmd.startsWith(batchArgKey)) { -450 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); -451 continue; -452 } -453 -454 final String familiesArgKey = "--families="; -455 if (cmd.startsWith(familiesArgKey)) { -456 families = cmd.substring(familiesArgKey.length()); -457 continue; -458 } -459 -460 final String rowPrefixesKey = "--row-prefixes="; -461 if (cmd.startsWith(rowPrefixesKey)){ -462 rowPrefixes = cmd.substring(rowPrefixesKey.length()); -463 continue; -464 } -465 -466 final String delimiterArgKey = "--delimiter="; -467 if (cmd.startsWith(delimiterArgKey)) { -468 delimiter = cmd.substring(delimiterArgKey.length()); -469 continue; -470 } -471 -472 final String sleepToReCompareKey = "--recomparesleep="; -473 if (cmd.startsWith(sleepToReCompareKey)) { -474 sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); -475 continue; -476 } -477 final String verboseKey = "--verbose"; -478 if (cmd.startsWith(verboseKey)) { -479 verbose = true; -480 continue; -481 } -482 -483 if (cmd.startsWith("--")) { -484 printUsage("Invalid argument '" + cmd + "'"); +384 Configuration peerClusterConf = peerConfigPair.getSecond(); +385 // Obtain the auth token from peer cluster +386 TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); +387 +388 job.setOutputFormatClass(NullOutputFormat.class); +389 job.setNumReduceTasks(0); +390 return job; +391 } +392 +393 private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { +394 if (rowPrefixes != null && !rowPrefixes.isEmpty()) { +395 String[] rowPrefixArray = rowPrefixes.split(","); +396 Arrays.sort(rowPrefixArray); +397 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); +398 for (String prefix : rowPrefixArray) { +399 Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); +400 filterList.addFilter(filter); +401 } +402 scan.setFilter(filterList); +403 byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); +404 byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]); +405 setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); +406 } +407 } +408 +409 private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { +410 scan.setStartRow(startPrefixRow); +411 byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), +412 new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)}); +413 scan.setStopRow(stopRow); +414 } +415 +416 private static boolean doCommandLine(final String[] args) { +417 if (args.length < 2) { +418 printUsage(null); +419 return false; +420 } +421 //in case we've been run before, restore all parameters to their initial states +422 //Otherwise, if our previous run included a parameter not in args this time, +423 //we might hold on to the old value. +424 restoreDefaults(); +425 try { +426 for (int i = 0; i < args.length; i++) { +427 String cmd = args[i]; +428 if (cmd.equals("-h") || cmd.startsWith("--h")) { +429 printUsage(null); +430 return false; +431 } +432 +433 final String startTimeArgKey = "--starttime="; +434 if (cmd.startsWith(startTimeArgKey)) { +435 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); +436 continue; +437 } +438 +439 final String endTimeArgKey = "--endtime="; +440 if (cmd.startsWith(endTimeArgKey)) { +441 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); +442 continue; +443 } +444 +445 final String includeDeletedCellsArgKey = "--raw"; +446 if (cmd.equals(includeDeletedCellsArgKey)) { +447 includeDeletedCells = true; +448 continue; +449 } +450 +451 final String versionsArgKey = "--versions="; +452 if (cmd.startsWith(versionsArgKey)) { +453 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); +454 continue; +455 } +456 +457 final String batchArgKey = "--batch="; +458 if (cmd.startsWith(batchArgKey)) { +459 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); +460 continue; +461 } +462 +463 final String familiesArgKey = "--families="; +464 if (cmd.startsWith(familiesArgKey)) { +465 families = cmd.substring(familiesArgKey.length()); +466 continue; +467 } +468 +469 final String rowPrefixesKey = "--row-prefixes="; +470 if (cmd.startsWith(rowPrefixesKey)){ +471 rowPrefixes = cmd.substring(rowPrefixesKey.length()); +472 continue; +473 } +474 +475 final String delimiterArgKey = "--delimiter="; +476 if (cmd.startsWith(delimiterArgKey)) { +477 delimiter = cmd.substring(delimiterArgKey.length()); +478 continue; +479 } +480 +481 final String sleepToReCompareKey = "--recomparesleep="; +482 if (cmd.startsWith(sleepToReCompareKey)) { +483 sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); +484 continue; 485 } -486 -487 if (i == args.length-2) { -488 peerId = cmd; -489 } -490 -491 if (i == args.length-1) { -492 tableName = cmd; -493 } -494 } -495 } catch (Exception e) { -496 e.printStackTrace(); -497 printUsage("Can't start because " + e.getMessage()); -498 return false; -499 } -500 return true; -501 } -502 -503 private static void restoreDefaults() { -504 startTime = 0; -505 endTime = Long.MAX_VALUE; -506 batch = Integer.MAX_VALUE; -507 versions = -1; -508 tableName = null; -509 families = null; -510 peerId = null; -511 rowPrefixes = null; -512 includeDeletedCells = false; -513 } -514 -515 /* -516 * @param errorMsg Error message. Can be null. -517 */ -518 private static void printUsage(final String errorMsg) { -519 if (errorMsg != null && errorMsg.length() > 0) { -520 System.err.println("ERROR: " + errorMsg); -521 } -522 System.err.println("Usage: verifyrep [--starttime=X]" + -523 " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + -524 "[--verbose] <peerid> <tablename>"); -525 System.err.println(); -526 System.err.println("Options:"); -527 System.err.println(" starttime beginning of the time range"); -528 System.err.println(" without endtime means from starttime to forever"); -529 System.err.println(" endtime end of the time range"); -530 System.err.println(" versions number of cell versions to verify"); -531 System.err.println(" raw includes raw scan if given in options"); -532 System.err.println(" families comma-separated list of families to copy"); -533 System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); -534 System.err.println(" delimiter the delimiter used in display around rowkey"); -535 System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + -536 "default value is 0 which disables the recompare."); -537 System.err.println(" verbose logs row keys of good rows"); -538 System.err.println(); -539 System.err.println("Args:"); -540 System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); -541 System.err.println(" tablename Name of the table to verify"); -542 System.err.println(); -543 System.err.println("Examples:"); -544 System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); -545 System.err.println(" $ hbase " + -546 "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" + -547 " --starttime=1265875194289 --endtime=1265878794289 5 TestTable "); -548 } -549 -550 @Override -551 public int run(String[] args) throws Exception { -552 Configuration conf = this.getConf(); -553 Job job = createSubmittableJob(conf, args); -554 if (job != null) { -555 return job.waitForCompletion(true) ? 0 : 1; -556 } -557 return 1; -558 } -559 -560 /** -561 * Main entry point. -562 * -563 * @param args The command line parameters. -564 * @throws Exception When running the job fails. -565 */ -566 public static void main(String[] args) throws Exception { -567 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args); -568 System.exit(res); +486 final String verboseKey = "--verbose"; +487 if (cmd.startsWith(verboseKey)) { +488 verbose = true; +489 continue; +490 } +491 +492 if (cmd.startsWith("--")) { +493 printUsage("Invalid argument '" + cmd + "'"); +494 } +495 +496 if (i == args.length-2) { +497 peerId = cmd; +498 } +499 +500 if (i == args.length-1) { +501 tableName = cmd; +502 } +503 } +504 } catch (Exception e) { +505 e.printStackTrace(); +506 printUsage("Can't start because " + e.getMessage()); +507 return false; +508 } +509 return true; +510 } +511 +512 private static void restoreDefaults() { +513 startTime = 0; +514 endTime = Long.MAX_VALUE; +515 batch = -1; +516 versions = -1; +517 tableName = null; +518 families = null; +519 peerId = null; +520 rowPrefixes = null; +521 includeDeletedCells = false; +522 } +523 +524 /* +525 * @param errorMsg Error message. Can be null. +526 */ +527 private static void printUsage(final String errorMsg) { +528 if (errorMsg != null && errorMsg.length() > 0) { +529 System.err.println("ERROR: " + errorMsg); +530 } +531 System.err.println("Usage: verifyrep [--starttime=X]" + +532 " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + +533 "[--batch=] [--verbose] <peerid> <tablename>"); +534 System.err.println(); +535 System.err.println("Options:"); +536 System.err.println(" starttime beginning of the time range"); +537 System.err.println(" without endtime means from starttime to forever"); +538 System.err.println(" endtime end of the time range"); +539 System.err.println(" versions number of cell versions to verify"); +540 System.err.println(" batch batch count for scan, " + +541 "note that result row counts will no longer be actual number of rows when you use this option"); +542 System.err.println(" raw includes raw scan if given in options"); +543 System.err.println(" families comma-separated list of families to copy"); +544 System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); +545 System.err.println(" delimiter the delimiter used in display around rowkey"); +546 System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + +547 "default value is 0 which disables the recompare."); +548 System.err.println(" verbose logs row keys of good rows");