Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E72F4200D13 for ; Sat, 30 Sep 2017 17:13:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E5B0B160BCB; Sat, 30 Sep 2017 15:13:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 760BE1609C8 for ; Sat, 30 Sep 2017 17:13:51 +0200 (CEST) Received: (qmail 30043 invoked by uid 500); 30 Sep 2017 15:13:27 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 29016 invoked by uid 99); 30 Sep 2017 15:13:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Sep 2017 15:13:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DA2DDF5C14; Sat, 30 Sep 2017 15:13:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Sat, 30 Sep 2017 15:14:05 -0000 Message-Id: In-Reply-To: <936e6bd03dca40f880997ecdeb2e72a0@git.apache.org> References: <936e6bd03dca40f880997ecdeb2e72a0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [45/51] [partial] hbase-site git commit: Published site at . archived-at: Sat, 30 Sep 2017 15:13:54 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/d41f56fe/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html ---------------------------------------------------------------------- diff --git a/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html b/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html index b8fea52..afbce13 100644 --- a/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html +++ b/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html @@ -63,979 +63,1017 @@ 055import org.apache.hadoop.hbase.security.token.TokenUtil; 056import org.apache.hadoop.hbase.util.Base64; 057import org.apache.hadoop.hbase.util.Bytes; -058import org.apache.hadoop.hbase.zookeeper.ZKConfig; -059import org.apache.hadoop.io.Writable; -060import org.apache.hadoop.mapreduce.InputFormat; -061import org.apache.hadoop.mapreduce.Job; -062import org.apache.hadoop.util.StringUtils; -063 -064import com.codahale.metrics.MetricRegistry; -065 -066/** -067 * Utility for {@link TableMapper} and {@link TableReducer} -068 */ -069@SuppressWarnings({ "rawtypes", "unchecked" }) -070@InterfaceAudience.Public -071public class TableMapReduceUtil { -072 private static final Log LOG = LogFactory.getLog(TableMapReduceUtil.class); -073 -074 /** -075 * Use this before submitting a TableMap job. It will appropriately set up -076 * the job. -077 * -078 * @param table The table name to read from. -079 * @param scan The scan instance with the columns, time range etc. -080 * @param mapper The mapper class to use. -081 * @param outputKeyClass The class of the output key. -082 * @param outputValueClass The class of the output value. -083 * @param job The current job to adjust. Make sure the passed job is -084 * carrying all necessary HBase configuration. -085 * @throws IOException When setting up the details fails. -086 */ -087 public static void initTableMapperJob(String table, Scan scan, -088 Class<? extends TableMapper> mapper, -089 Class<?> outputKeyClass, -090 Class<?> outputValueClass, Job job) -091 throws IOException { -092 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, -093 job, true); -094 } -095 +058import org.apache.hadoop.hbase.util.RegionSplitter; +059import org.apache.hadoop.hbase.zookeeper.ZKConfig; +060import org.apache.hadoop.io.Writable; +061import org.apache.hadoop.mapreduce.InputFormat; +062import org.apache.hadoop.mapreduce.Job; +063import org.apache.hadoop.util.StringUtils; +064 +065import com.codahale.metrics.MetricRegistry; +066 +067/** +068 * Utility for {@link TableMapper} and {@link TableReducer} +069 */ +070@SuppressWarnings({ "rawtypes", "unchecked" }) +071@InterfaceAudience.Public +072public class TableMapReduceUtil { +073 private static final Log LOG = LogFactory.getLog(TableMapReduceUtil.class); +074 +075 /** +076 * Use this before submitting a TableMap job. It will appropriately set up +077 * the job. +078 * +079 * @param table The table name to read from. +080 * @param scan The scan instance with the columns, time range etc. +081 * @param mapper The mapper class to use. +082 * @param outputKeyClass The class of the output key. +083 * @param outputValueClass The class of the output value. +084 * @param job The current job to adjust. Make sure the passed job is +085 * carrying all necessary HBase configuration. +086 * @throws IOException When setting up the details fails. +087 */ +088 public static void initTableMapperJob(String table, Scan scan, +089 Class<? extends TableMapper> mapper, +090 Class<?> outputKeyClass, +091 Class<?> outputValueClass, Job job) +092 throws IOException { +093 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, +094 job, true); +095 } 096 -097 /** -098 * Use this before submitting a TableMap job. It will appropriately set up -099 * the job. -100 * -101 * @param table The table name to read from. -102 * @param scan The scan instance with the columns, time range etc. -103 * @param mapper The mapper class to use. -104 * @param outputKeyClass The class of the output key. -105 * @param outputValueClass The class of the output value. -106 * @param job The current job to adjust. Make sure the passed job is -107 * carrying all necessary HBase configuration. -108 * @throws IOException When setting up the details fails. -109 */ -110 public static void initTableMapperJob(TableName table, -111 Scan scan, -112 Class<? extends TableMapper> mapper, -113 Class<?> outputKeyClass, -114 Class<?> outputValueClass, -115 Job job) throws IOException { -116 initTableMapperJob(table.getNameAsString(), -117 scan, -118 mapper, -119 outputKeyClass, -120 outputValueClass, -121 job, -122 true); -123 } -124 -125 /** -126 * Use this before submitting a TableMap job. It will appropriately set up -127 * the job. -128 * -129 * @param table Binary representation of the table name to read from. -130 * @param scan The scan instance with the columns, time range etc. -131 * @param mapper The mapper class to use. -132 * @param outputKeyClass The class of the output key. -133 * @param outputValueClass The class of the output value. -134 * @param job The current job to adjust. Make sure the passed job is -135 * carrying all necessary HBase configuration. -136 * @throws IOException When setting up the details fails. -137 */ -138 public static void initTableMapperJob(byte[] table, Scan scan, -139 Class<? extends TableMapper> mapper, -140 Class<?> outputKeyClass, -141 Class<?> outputValueClass, Job job) -142 throws IOException { -143 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, -144 job, true); -145 } -146 -147 /** -148 * Use this before submitting a TableMap job. It will appropriately set up -149 * the job. -150 * -151 * @param table The table name to read from. -152 * @param scan The scan instance with the columns, time range etc. -153 * @param mapper The mapper class to use. -154 * @param outputKeyClass The class of the output key. -155 * @param outputValueClass The class of the output value. -156 * @param job The current job to adjust. Make sure the passed job is -157 * carrying all necessary HBase configuration. -158 * @param addDependencyJars upload HBase jars and jars for any of the configured -159 * job classes via the distributed cache (tmpjars). -160 * @throws IOException When setting up the details fails. -161 */ -162 public static void initTableMapperJob(String table, Scan scan, -163 Class<? extends TableMapper> mapper, -164 Class<?> outputKeyClass, -165 Class<?> outputValueClass, Job job, -166 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) -167 throws IOException { -168 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, -169 addDependencyJars, true, inputFormatClass); -170 } -171 +097 +098 /** +099 * Use this before submitting a TableMap job. It will appropriately set up +100 * the job. +101 * +102 * @param table The table name to read from. +103 * @param scan The scan instance with the columns, time range etc. +104 * @param mapper The mapper class to use. +105 * @param outputKeyClass The class of the output key. +106 * @param outputValueClass The class of the output value. +107 * @param job The current job to adjust. Make sure the passed job is +108 * carrying all necessary HBase configuration. +109 * @throws IOException When setting up the details fails. +110 */ +111 public static void initTableMapperJob(TableName table, +112 Scan scan, +113 Class<? extends TableMapper> mapper, +114 Class<?> outputKeyClass, +115 Class<?> outputValueClass, +116 Job job) throws IOException { +117 initTableMapperJob(table.getNameAsString(), +118 scan, +119 mapper, +120 outputKeyClass, +121 outputValueClass, +122 job, +123 true); +124 } +125 +126 /** +127 * Use this before submitting a TableMap job. It will appropriately set up +128 * the job. +129 * +130 * @param table Binary representation of the table name to read from. +131 * @param scan The scan instance with the columns, time range etc. +132 * @param mapper The mapper class to use. +133 * @param outputKeyClass The class of the output key. +134 * @param outputValueClass The class of the output value. +135 * @param job The current job to adjust. Make sure the passed job is +136 * carrying all necessary HBase configuration. +137 * @throws IOException When setting up the details fails. +138 */ +139 public static void initTableMapperJob(byte[] table, Scan scan, +140 Class<? extends TableMapper> mapper, +141 Class<?> outputKeyClass, +142 Class<?> outputValueClass, Job job) +143 throws IOException { +144 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, +145 job, true); +146 } +147 +148 /** +149 * Use this before submitting a TableMap job. It will appropriately set up +150 * the job. +151 * +152 * @param table The table name to read from. +153 * @param scan The scan instance with the columns, time range etc. +154 * @param mapper The mapper class to use. +155 * @param outputKeyClass The class of the output key. +156 * @param outputValueClass The class of the output value. +157 * @param job The current job to adjust. Make sure the passed job is +158 * carrying all necessary HBase configuration. +159 * @param addDependencyJars upload HBase jars and jars for any of the configured +160 * job classes via the distributed cache (tmpjars). +161 * @throws IOException When setting up the details fails. +162 */ +163 public static void initTableMapperJob(String table, Scan scan, +164 Class<? extends TableMapper> mapper, +165 Class<?> outputKeyClass, +166 Class<?> outputValueClass, Job job, +167 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) +168 throws IOException { +169 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, +170 addDependencyJars, true, inputFormatClass); +171 } 172 -173 /** -174 * Use this before submitting a TableMap job. It will appropriately set up -175 * the job. -176 * -177 * @param table The table name to read from. -178 * @param scan The scan instance with the columns, time range etc. -179 * @param mapper The mapper class to use. -180 * @param outputKeyClass The class of the output key. -181 * @param outputValueClass The class of the output value. -182 * @param job The current job to adjust. Make sure the passed job is -183 * carrying all necessary HBase configuration. -184 * @param addDependencyJars upload HBase jars and jars for any of the configured -185 * job classes via the distributed cache (tmpjars). -186 * @param initCredentials whether to initialize hbase auth credentials for the job -187 * @param inputFormatClass the input format -188 * @throws IOException When setting up the details fails. -189 */ -190 public static void initTableMapperJob(String table, Scan scan, -191 Class<? extends TableMapper> mapper, -192 Class<?> outputKeyClass, -193 Class<?> outputValueClass, Job job, -194 boolean addDependencyJars, boolean initCredentials, -195 Class<? extends InputFormat> inputFormatClass) -196 throws IOException { -197 job.setInputFormatClass(inputFormatClass); -198 if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); -199 if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); -200 job.setMapperClass(mapper); -201 if (Put.class.equals(outputValueClass)) { -202 job.setCombinerClass(PutCombiner.class); -203 } -204 Configuration conf = job.getConfiguration(); -205 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); -206 conf.set(TableInputFormat.INPUT_TABLE, table); -207 conf.set(TableInputFormat.SCAN, convertScanToString(scan)); -208 conf.setStrings("io.serializations", conf.get("io.serializations"), -209 MutationSerialization.class.getName(), ResultSerialization.class.getName(), -210 KeyValueSerialization.class.getName()); -211 if (addDependencyJars) { -212 addDependencyJars(job); -213 } -214 if (initCredentials) { -215 initCredentials(job); -216 } -217 } -218 -219 /** -220 * Use this before submitting a TableMap job. It will appropriately set up -221 * the job. -222 * -223 * @param table Binary representation of the table name to read from. -224 * @param scan The scan instance with the columns, time range etc. -225 * @param mapper The mapper class to use. -226 * @param outputKeyClass The class of the output key. -227 * @param outputValueClass The class of the output value. -228 * @param job The current job to adjust. Make sure the passed job is -229 * carrying all necessary HBase configuration. -230 * @param addDependencyJars upload HBase jars and jars for any of the configured -231 * job classes via the distributed cache (tmpjars). -232 * @param inputFormatClass The class of the input format -233 * @throws IOException When setting up the details fails. -234 */ -235 public static void initTableMapperJob(byte[] table, Scan scan, -236 Class<? extends TableMapper> mapper, -237 Class<?> outputKeyClass, -238 Class<?> outputValueClass, Job job, -239 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) -240 throws IOException { -241 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, -242 outputValueClass, job, addDependencyJars, inputFormatClass); -243 } -244 -245 /** -246 * Use this before submitting a TableMap job. It will appropriately set up -247 * the job. -248 * -249 * @param table Binary representation of the table name to read from. -250 * @param scan The scan instance with the columns, time range etc. -251 * @param mapper The mapper class to use. -252 * @param outputKeyClass The class of the output key. -253 * @param outputValueClass The class of the output value. -254 * @param job The current job to adjust. Make sure the passed job is -255 * carrying all necessary HBase configuration. -256 * @param addDependencyJars upload HBase jars and jars for any of the configured -257 * job classes via the distributed cache (tmpjars). -258 * @throws IOException When setting up the details fails. -259 */ -260 public static void initTableMapperJob(byte[] table, Scan scan, -261 Class<? extends TableMapper> mapper, -262 Class<?> outputKeyClass, -263 Class<?> outputValueClass, Job job, -264 boolean addDependencyJars) -265 throws IOException { -266 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, -267 outputValueClass, job, addDependencyJars, TableInputFormat.class); -268 } -269 -270 /** -271 * Use this before submitting a TableMap job. It will appropriately set up -272 * the job. -273 * -274 * @param table The table name to read from. -275 * @param scan The scan instance with the columns, time range etc. -276 * @param mapper The mapper class to use. -277 * @param outputKeyClass The class of the output key. -278 * @param outputValueClass The class of the output value. -279 * @param job The current job to adjust. Make sure the passed job is -280 * carrying all necessary HBase configuration. -281 * @param addDependencyJars upload HBase jars and jars for any of the configured -282 * job classes via the distributed cache (tmpjars). -283 * @throws IOException When setting up the details fails. -284 */ -285 public static void initTableMapperJob(String table, Scan scan, -286 Class<? extends TableMapper> mapper, -287 Class<?> outputKeyClass, -288 Class<?> outputValueClass, Job job, -289 boolean addDependencyJars) -290 throws IOException { -291 initTableMapperJob(table, scan, mapper, outputKeyClass, -292 outputValueClass, job, addDependencyJars, TableInputFormat.class); -293 } -294 -295 /** -296 * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on -297 * direct memory will likely cause the map tasks to OOM when opening the region. This -298 * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user -299 * wants to override this behavior in their job. -300 */ -301 public static void resetCacheConfig(Configuration conf) { -302 conf.setFloat( -303 HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); -304 conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f); -305 conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); -306 } -307 -308 /** -309 * Sets up the job for reading from one or more table snapshots, with one or more scans -310 * per snapshot. -311 * It bypasses hbase servers and read directly from snapshot files. -312 * -313 * @param snapshotScans map of snapshot name to scans on that snapshot. -314 * @param mapper The mapper class to use. -315 * @param outputKeyClass The class of the output key. -316 * @param outputValueClass The class of the output value. -317 * @param job The current job to adjust. Make sure the passed job is -318 * carrying all necessary HBase configuration. -319 * @param addDependencyJars upload HBase jars and jars for any of the configured -320 * job classes via the distributed cache (tmpjars). -321 */ -322 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, -323 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, -324 Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { -325 MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir); -326 -327 job.setInputFormatClass(MultiTableSnapshotInputFormat.class); -328 if (outputValueClass != null) { -329 job.setMapOutputValueClass(outputValueClass); -330 } -331 if (outputKeyClass != null) { -332 job.setMapOutputKeyClass(outputKeyClass); -333 } -334 job.setMapperClass(mapper); -335 Configuration conf = job.getConfiguration(); -336 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); -337 -338 if (addDependencyJars) { -339 addDependencyJars(job); -340 addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class); -341 } -342 -343 resetCacheConfig(job.getConfiguration()); -344 } -345 -346 /** -347 * Sets up the job for reading from a table snapshot. It bypasses hbase servers -348 * and read directly from snapshot files. -349 * -350 * @param snapshotName The name of the snapshot (of a table) to read from. -351 * @param scan The scan instance with the columns, time range etc. -352 * @param mapper The mapper class to use. -353 * @param outputKeyClass The class of the output key. -354 * @param outputValueClass The class of the output value. -355 * @param job The current job to adjust. Make sure the passed job is -356 * carrying all necessary HBase configuration. -357 * @param addDependencyJars upload HBase jars and jars for any of the configured -358 * job classes via the distributed cache (tmpjars). -359 * -360 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should -361 * have write permissions to this directory, and this should not be a subdirectory of rootdir. -362 * After the job is finished, restore directory can be deleted. -363 * @throws IOException When setting up the details fails. -364 * @see TableSnapshotInputFormat -365 */ -366 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, -367 Class<? extends TableMapper> mapper, -368 Class<?> outputKeyClass, -369 Class<?> outputValueClass, Job job, -370 boolean addDependencyJars, Path tmpRestoreDir) -371 throws IOException { -372 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); -373 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, -374 outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); -375 resetCacheConfig(job.getConfiguration()); -376 } -377 -378 /** -379 * Use this before submitting a Multi TableMap job. It will appropriately set -380 * up the job. -381 * -382 * @param scans The list of {@link Scan} objects to read from. -383 * @param mapper The mapper class to use. -384 * @param outputKeyClass The class of the output key. -385 * @param outputValueClass The class of the output value. -386 * @param job The current job to adjust. Make sure the passed job is carrying -387 * all necessary HBase configuration. -388 * @throws IOException When setting up the details fails. -389 */ -390 public static void initTableMapperJob(List<Scan> scans, -391 Class<? extends TableMapper> mapper, -392 Class<?> outputKeyClass, -393 Class<?> outputValueClass, Job job) throws IOException { -394 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, -395 true); -396 } -397 -398 /** -399 * Use this before submitting a Multi TableMap job. It will appropriately set -400 * up the job. -401 * -402 * @param scans The list of {@link Scan} objects to read from. -403 * @param mapper The mapper class to use. -404 * @param outputKeyClass The class of the output key. -405 * @param outputValueClass The class of the output value. -406 * @param job The current job to adjust. Make sure the passed job is carrying -407 * all necessary HBase configuration. -408 * @param addDependencyJars upload HBase jars and jars for any of the -409 * configured job classes via the distributed cache (tmpjars). -410 * @throws IOException When setting up the details fails. -411 */ -412 public static void initTableMapperJob(List<Scan> scans, -413 Class<? extends TableMapper> mapper, -414 Class<?> outputKeyClass, -415 Class<?> outputValueClass, Job job, -416 boolean addDependencyJars) throws IOException { -417 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, -418 addDependencyJars, true); -419 } -420 -421 /** -422 * Use this before submitting a Multi TableMap job. It will appropriately set -423 * up the job. -424 * -425 * @param scans The list of {@link Scan} objects to read from. -426 * @param mapper The mapper class to use. -427 * @param outputKeyClass The class of the output key. -428 * @param outputValueClass The class of the output value. -429 * @param job The current job to adjust. Make sure the passed job is carrying -430 * all necessary HBase configuration. -431 * @param addDependencyJars upload HBase jars and jars for any of the -432 * configured job classes via the distributed cache (tmpjars). -433 * @param initCredentials whether to initialize hbase auth credentials for the job -434 * @throws IOException When setting up the details fails. -435 */ -436 public static void initTableMapperJob(List<Scan> scans, -437 Class<? extends TableMapper> mapper, -438 Class<?> outputKeyClass, -439 Class<?> outputValueClass, Job job, -440 boolean addDependencyJars, -441 boolean initCredentials) throws IOException { -442 job.setInputFormatClass(MultiTableInputFormat.class); -443 if (outputValueClass != null) { -444 job.setMapOutputValueClass(outputValueClass); -445 } -446 if (outputKeyClass != null) { -447 job.setMapOutputKeyClass(outputKeyClass); -448 } -449 job.setMapperClass(mapper); -450 Configuration conf = job.getConfiguration(); -451 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); -452 List<String> scanStrings = new ArrayList<>(); -453 -454 for (Scan scan : scans) { -455 scanStrings.add(convertScanToString(scan)); -456 } -457 job.getConfiguration().setStrings(MultiTableInputFormat.SCANS, -458 scanStrings.toArray(new String[scanStrings.size()])); -459 -460 if (addDependencyJars) { -461 addDependencyJars(job); -462 } -463 -464 if (initCredentials) { -465 initCredentials(job); -466 } -467 } -468 -469 public static void initCredentials(Job job) throws IOException { -470 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); -471 if (userProvider.isHadoopSecurityEnabled()) { -472 // propagate delegation related props from launcher job to MR job -473 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { -474 job.getConfiguration().set("mapreduce.job.credentials.binary", -475 System.getenv("HADOOP_TOKEN_FILE_LOCATION")); -476 } -477 } -478 -479 if (userProvider.isHBaseSecurityEnabled()) { -480 try { -481 // init credentials for remote cluster -482 String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); -483 User user = userProvider.getCurrent(); -484 if (quorumAddress != null) { -485 Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), -486 quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX); -487 Connection peerConn = ConnectionFactory.createConnection(peerConf); -488 try { -489 TokenUtil.addTokenForJob(peerConn, user, job); -490 } finally { -491 peerConn.close(); -492 } -493 } -494 -495 Connection conn = ConnectionFactory.createConnection(job.getConfiguration()); -496 try { -497 TokenUtil.addTokenForJob(conn, user, job); -498 } finally { -499 conn.close(); -500 } -501 } catch (InterruptedException ie) { -502 LOG.info("Interrupted obtaining user authentication token"); -503 Thread.currentThread().interrupt(); -504 } -505 } -506 } -507 -508 /** -509 * Obtain an authentication token, for the specified cluster, on behalf of the current user -510 * and add it to the credentials for the given map reduce job. -511 * -512 * The quorumAddress is the key to the ZK ensemble, which contains: -513 * hbase.zookeeper.quorum, hbase.zookeeper.client.port and -514 * zookeeper.znode.parent -515 * -516 * @param job The job that requires the permission. -517 * @param quorumAddress string that contains the 3 required configuratins -518 * @throws IOException When the authentication token cannot be obtained. -519 * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead. -520 */ -521 @Deprecated -522 public static void initCredentialsForCluster(Job job, String quorumAddress) -523 throws IOException { -524 Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), -525 quorumAddress); -526 initCredentialsForCluster(job, peerConf); -527 } -528 -529 /** -530 * Obtain an authentication token, for the specified cluster, on behalf of the current user -531 * and add it to the credentials for the given map reduce job. -532 * -533 * @param job The job that requires the permission. -534 * @param conf The configuration to use in connecting to the peer cluster -535 * @throws IOException When the authentication token cannot be obtained. -536 */ -537 public static void initCredentialsForCluster(Job job, Configuration conf) -538 throws IOException { -539 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); -540 if (userProvider.isHBaseSecurityEnabled()) { -541 try { -542 Connection peerConn = ConnectionFactory.createConnection(conf); -543 try { -544 TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job); -545 } finally { -546 peerConn.close(); -547 } -548 } catch (InterruptedException e) { -549 LOG.info("Interrupted obtaining user authentication token"); -550 Thread.interrupted(); -551 } -552 } -553 } -554 -555 /** -556 * Writes the given scan into a Base64 encoded string. -557 * -558 * @param scan The scan to write out. -559 * @return The scan saved in a Base64 encoded string. -560 * @throws IOException When writing the scan fails. -561 */ -562 public static String convertScanToString(Scan scan) throws IOException { -563 ClientProtos.Scan proto = ProtobufUtil.toScan(scan); -564 return Base64.encodeBytes(proto.toByteArray()); +173 +174 /** +175 * Use this before submitting a TableMap job. It will appropriately set up +176 * the job. +177 * +178 * @param table The table name to read from. +179 * @param scan The scan instance with the columns, time range etc. +180 * @param mapper The mapper class to use. +181 * @param outputKeyClass The class of the output key. +182 * @param outputValueClass The class of the output value. +183 * @param job The current job to adjust. Make sure the passed job is +184 * carrying all necessary HBase configuration. +185 * @param addDependencyJars upload HBase jars and jars for any of the configured +186 * job classes via the distributed cache (tmpjars). +187 * @param initCredentials whether to initialize hbase auth credentials for the job +188 * @param inputFormatClass the input format +189 * @throws IOException When setting up the details fails. +190 */ +191 public static void initTableMapperJob(String table, Scan scan, +192 Class<? extends TableMapper> mapper, +193 Class<?> outputKeyClass, +194 Class<?> outputValueClass, Job job, +195 boolean addDependencyJars, boolean initCredentials, +196 Class<? extends InputFormat> inputFormatClass) +197 throws IOException { +198 job.setInputFormatClass(inputFormatClass); +199 if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); +200 if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); +201 job.setMapperClass(mapper); +202 if (Put.class.equals(outputValueClass)) { +203 job.setCombinerClass(PutCombiner.class); +204 } +205 Configuration conf = job.getConfiguration(); +206 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); +207 conf.set(TableInputFormat.INPUT_TABLE, table); +208 conf.set(TableInputFormat.SCAN, convertScanToString(scan)); +209 conf.setStrings("io.serializations", conf.get("io.serializations"), +210 MutationSerialization.class.getName(), ResultSerialization.class.getName(), +211 KeyValueSerialization.class.getName()); +212 if (addDependencyJars) { +213 addDependencyJars(job); +214 } +215 if (initCredentials) { +216 initCredentials(job); +217 } +218 } +219 +220 /** +221 * Use this before submitting a TableMap job. It will appropriately set up +222 * the job. +223 * +224 * @param table Binary representation of the table name to read from. +225 * @param scan The scan instance with the columns, time range etc. +226 * @param mapper The mapper class to use. +227 * @param outputKeyClass The class of the output key. +228 * @param outputValueClass The class of the output value. +229 * @param job The current job to adjust. Make sure the passed job is +230 * carrying all necessary HBase configuration. +231 * @param addDependencyJars upload HBase jars and jars for any of the configured +232 * job classes via the distributed cache (tmpjars). +233 * @param inputFormatClass The class of the input format +234 * @throws IOException When setting up the details fails. +235 */ +236 public static void initTableMapperJob(byte[] table, Scan scan, +237 Class<? extends TableMapper> mapper, +238 Class<?> outputKeyClass, +239 Class<?> outputValueClass, Job job, +240 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) +241 throws IOException { +242 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, +243 outputValueClass, job, addDependencyJars, inputFormatClass); +244 } +245 +246 /** +247 * Use this before submitting a TableMap job. It will appropriately set up +248 * the job. +249 * +250 * @param table Binary representation of the table name to read from. +251 * @param scan The scan instance with the columns, time range etc. +252 * @param mapper The mapper class to use. +253 * @param outputKeyClass The class of the output key. +254 * @param outputValueClass The class of the output value. +255 * @param job The current job to adjust. Make sure the passed job is +256 * carrying all necessary HBase configuration. +257 * @param addDependencyJars upload HBase jars and jars for any of the configured +258 * job classes via the distributed cache (tmpjars). +259 * @throws IOException When setting up the details fails. +260 */ +261 public static void initTableMapperJob(byte[] table, Scan scan, +262 Class<? extends TableMapper> mapper, +263 Class<?> outputKeyClass, +264 Class<?> outputValueClass, Job job, +265 boolean addDependencyJars) +266 throws IOException { +267 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, +268 outputValueClass, job, addDependencyJars, TableInputFormat.class); +269 } +270 +271 /** +272 * Use this before submitting a TableMap job. It will appropriately set up +273 * the job. +274 * +275 * @param table The table name to read from. +276 * @param scan The scan instance with the columns, time range etc. +277 * @param mapper The mapper class to use. +278 * @param outputKeyClass The class of the output key. +279 * @param outputValueClass The class of the output value. +280 * @param job The current job to adjust. Make sure the passed job is +281 * carrying all necessary HBase configuration. +282 * @param addDependencyJars upload HBase jars and jars for any of the configured +283 * job classes via the distributed cache (tmpjars). +284 * @throws IOException When setting up the details fails. +285 */ +286 public static void initTableMapperJob(String table, Scan scan, +287 Class<? extends TableMapper> mapper, +288 Class<?> outputKeyClass, +289 Class<?> outputValueClass, Job job, +290 boolean addDependencyJars) +291 throws IOException { +292 initTableMapperJob(table, scan, mapper, outputKeyClass, +293 outputValueClass, job, addDependencyJars, TableInputFormat.class); +294 } +295 +296 /** +297 * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on +298 * direct memory will likely cause the map tasks to OOM when opening the region. This +299 * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user +300 * wants to override this behavior in their job. +301 */ +302 public static void resetCacheConfig(Configuration conf) { +303 conf.setFloat( +304 HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); +305 conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f); +306 conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); +307 } +308 +309 /** +310 * Sets up the job for reading from one or more table snapshots, with one or more scans +311 * per snapshot. +312 * It bypasses hbase servers and read directly from snapshot files. +313 * +314 * @param snapshotScans map of snapshot name to scans on that snapshot. +315 * @param mapper The mapper class to use. +316 * @param outputKeyClass The class of the output key. +317 * @param outputValueClass The class of the output value. +318 * @param job The current job to adjust. Make sure the passed job is +319 * carrying all necessary HBase configuration. +320 * @param addDependencyJars upload HBase jars and jars for any of the configured +321 * job classes via the distributed cache (tmpjars). +322 */ +323 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, +324 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, +325 Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { +326 MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir); +327 +328 job.setInputFormatClass(MultiTableSnapshotInputFormat.class); +329 if (outputValueClass != null) { +330 job.setMapOutputValueClass(outputValueClass); +331 } +332 if (outputKeyClass != null) { +333 job.setMapOutputKeyClass(outputKeyClass); +334 } +335 job.setMapperClass(mapper); +336 Configuration conf = job.getConfiguration(); +337 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); +338 +339 if (addDependencyJars) { +340 addDependencyJars(job); +341 addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class); +342 } +343 +344 resetCacheConfig(job.getConfiguration()); +345 } +346 +347 /** +348 * Sets up the job for reading from a table snapshot. It bypasses hbase servers +349 * and read directly from snapshot files. +350 * +351 * @param snapshotName The name of the snapshot (of a table) to read from. +352 * @param scan The scan instance with the columns, time range etc. +353 * @param mapper The mapper class to use. +354 * @param outputKeyClass The class of the output key. +355 * @param outputValueClass The class of the output value. +356 * @param job The current job to adjust. Make sure the passed job is +357 * carrying all necessary HBase configuration. +358 * @param addDependencyJars upload HBase jars and jars for any of the configured +359 * job classes via the distributed cache (tmpjars). +360 * +361 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should +362 * have write permissions to this directory, and this should not be a subdirectory of rootdir. +363 * After the job is finished, restore directory can be deleted. +364 * @throws IOException When setting up the details fails. +365 * @see TableSnapshotInputFormat +366 */ +367 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, +368 Class<? extends TableMapper> mapper, +369 Class<?> outputKeyClass, +370 Class<?> outputValueClass, Job job, +371 boolean addDependencyJars, Path tmpRestoreDir) +372 throws IOException { +373 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); +374 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, +375 outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); +376 resetCacheConfig(job.getConfiguration()); +377 } +378 +379 /** +380 * Sets up the job for reading from a table snapshot. It bypasses hbase servers +381 * and read directly from snapshot files. +382 * +383 * @param snapshotName The name of the snapshot (of a table) to read from. +384 * @param scan The scan instance with the columns, time range etc. +385 * @param mapper The mapper class to use. +386 * @param outputKeyClass The class of the output key. +387 * @param outputValueClass The class of the output value. +388 * @param job The current job to adjust. Make sure the passed job is +389 * carrying all necessary HBase configuration. +390 * @param addDependencyJars upload HBase jars and jars for any of the configured +391 * job classes via the distributed cache (tmpjars). +392 * +393 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should +394 * have write permissions to this directory, and this should not be a subdirectory of rootdir. +395 * After the job is finished, restore directory can be deleted. +396 * @param splitAlgo algorithm to split +397 * @param numSplitsPerRegion