Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4DD3E18F41 for ; Thu, 21 Jan 2016 17:22:29 +0000 (UTC) Received: (qmail 12525 invoked by uid 500); 21 Jan 2016 17:22:22 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 12414 invoked by uid 500); 21 Jan 2016 17:22:22 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 10641 invoked by uid 99); 21 Jan 2016 17:22:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Jan 2016 17:22:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 724ECE0BB6; Thu, 21 Jan 2016 17:22:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Thu, 21 Jan 2016 17:22:57 -0000 Message-Id: In-Reply-To: <260fbf76833c437ea038951881d40890@git.apache.org> References: <260fbf76833c437ea038951881d40890@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [38/51] [partial] hbase-site git commit: Published site at 4bf6f8379d7f85413b914dddf607d016780d40ce. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/75eda567/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.html ---------------------------------------------------------------------- diff --git a/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.html b/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.html index cf443ff..1adf99a 100644 --- a/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.html +++ b/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.html @@ -118,195 +118,191 @@ 110 @Override 111 public void close() throws IOException { 112 trr.close(); -113 if (connection != null) { -114 connection.close(); -115 } -116 } -117 -118 @Override -119 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { -120 return trr.getCurrentKey(); -121 } -122 -123 @Override -124 public Result getCurrentValue() throws IOException, InterruptedException { -125 return trr.getCurrentValue(); -126 } -127 -128 @Override -129 public float getProgress() throws IOException, InterruptedException { -130 return trr.getProgress(); -131 } -132 -133 @Override -134 public void initialize(InputSplit inputsplit, TaskAttemptContext context) -135 throws IOException, InterruptedException { -136 trr.initialize(inputsplit, context); -137 } -138 -139 @Override -140 public boolean nextKeyValue() throws IOException, InterruptedException { -141 return trr.nextKeyValue(); -142 } -143 }; -144 } catch (IOException ioe) { -145 // If there is an exception make sure that all -146 // resources are closed and released. -147 trr.close(); -148 if (connection != null) { -149 connection.close(); -150 } -151 throw ioe; -152 } -153 } -154 -155 /** -156 * Calculates the splits that will serve as input for the map tasks. The -157 * number of splits matches the number of regions in a table. -158 * -159 * @param context The current job context. -160 * @return The list of input splits. -161 * @throws IOException When creating the list of splits fails. -162 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext) -163 */ -164 @Override -165 public List<InputSplit> getSplits(JobContext context) throws IOException { -166 if (scans.isEmpty()) { -167 throw new IOException("No scans were provided."); -168 } -169 -170 Map<TableName, List<Scan>> tableMaps = new HashMap<TableName, List<Scan>>(); -171 for (Scan scan : scans) { -172 byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); -173 if (tableNameBytes == null) -174 throw new IOException("A scan object did not have a table name"); -175 -176 TableName tableName = TableName.valueOf(tableNameBytes); -177 -178 List<Scan> scanList = tableMaps.get(tableName); -179 if (scanList == null) { -180 scanList = new ArrayList<Scan>(); -181 tableMaps.put(tableName, scanList); -182 } -183 scanList.add(scan); -184 } -185 -186 List<InputSplit> splits = new ArrayList<InputSplit>(); -187 Iterator iter = tableMaps.entrySet().iterator(); -188 while (iter.hasNext()) { -189 Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next(); -190 TableName tableName = entry.getKey(); -191 List<Scan> scanList = entry.getValue(); -192 -193 try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration()); -194 Table table = conn.getTable(tableName); -195 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { -196 RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( -197 regionLocator, conn.getAdmin()); -198 Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); -199 for (Scan scan : scanList) { -200 if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { -201 throw new IOException("Expecting at least one region for table : " -202 + tableName.getNameAsString()); -203 } -204 int count = 0; -205 -206 byte[] startRow = scan.getStartRow(); -207 byte[] stopRow = scan.getStopRow(); -208 -209 for (int i = 0; i < keys.getFirst().length; i++) { -210 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { -211 continue; -212 } -213 -214 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || -215 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && -216 (stopRow.length == 0 || Bytes.compareTo(stopRow, -217 keys.getFirst()[i]) > 0)) { -218 byte[] splitStart = startRow.length == 0 || -219 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? -220 keys.getFirst()[i] : startRow; -221 byte[] splitStop = (stopRow.length == 0 || -222 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && -223 keys.getSecond()[i].length > 0 ? -224 keys.getSecond()[i] : stopRow; -225 -226 HRegionLocation hregionLocation = regionLocator.getRegionLocation( -227 keys.getFirst()[i], false); -228 String regionHostname = hregionLocation.getHostname(); -229 HRegionInfo regionInfo = hregionLocation.getRegionInfo(); -230 long regionSize = sizeCalculator.getRegionSize( -231 regionInfo.getRegionName()); -232 -233 TableSplit split = new TableSplit(table.getName(), -234 scan, splitStart, splitStop, regionHostname, regionSize); -235 -236 splits.add(split); -237 -238 if (LOG.isDebugEnabled()) -239 LOG.debug("getSplits: split -> " + (count++) + " -> " + split); -240 } -241 } -242 } -243 } -244 } -245 -246 return splits; -247 } -248 -249 /** -250 * Test if the given region is to be included in the InputSplit while -251 * splitting the regions of a table. -252 * <p> -253 * This optimization is effective when there is a specific reasoning to -254 * exclude an entire region from the M-R job, (and hence, not contributing to -255 * the InputSplit), given the start and end keys of the same. <br> -256 * Useful when we need to remember the last-processed top record and revisit -257 * the [last, current) interval for M-R processing, continuously. In addition -258 * to reducing InputSplits, reduces the load on the region server as well, due -259 * to the ordering of the keys. <br> -260 * <br> -261 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last -262 * (recent) region. <br> -263 * Override this method, if you want to bulk exclude regions altogether from -264 * M-R. By default, no region is excluded( i.e. all regions are included). -265 * -266 * @param startKey Start key of the region -267 * @param endKey End key of the region -268 * @return true, if this region needs to be included as part of the input -269 * (default). -270 */ -271 protected boolean includeRegionInSplit(final byte[] startKey, -272 final byte[] endKey) { -273 return true; -274 } -275 -276 /** -277 * Allows subclasses to get the list of {@link Scan} objects. -278 */ -279 protected List<Scan> getScans() { -280 return this.scans; -281 } -282 -283 /** -284 * Allows subclasses to set the list of {@link Scan} objects. -285 * -286 * @param scans The list of {@link Scan} used to define the input -287 */ -288 protected void setScans(List<Scan> scans) { -289 this.scans = scans; -290 } -291 -292 /** -293 * Allows subclasses to set the {@link TableRecordReader}. -294 * -295 * @param tableRecordReader A different {@link TableRecordReader} -296 * implementation. -297 */ -298 protected void setTableRecordReader(TableRecordReader tableRecordReader) { -299 this.tableRecordReader = tableRecordReader; -300 } -301} +113 connection.close(); +114 } +115 +116 @Override +117 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { +118 return trr.getCurrentKey(); +119 } +120 +121 @Override +122 public Result getCurrentValue() throws IOException, InterruptedException { +123 return trr.getCurrentValue(); +124 } +125 +126 @Override +127 public float getProgress() throws IOException, InterruptedException { +128 return trr.getProgress(); +129 } +130 +131 @Override +132 public void initialize(InputSplit inputsplit, TaskAttemptContext context) +133 throws IOException, InterruptedException { +134 trr.initialize(inputsplit, context); +135 } +136 +137 @Override +138 public boolean nextKeyValue() throws IOException, InterruptedException { +139 return trr.nextKeyValue(); +140 } +141 }; +142 } catch (IOException ioe) { +143 // If there is an exception make sure that all +144 // resources are closed and released. +145 trr.close(); +146 connection.close(); +147 throw ioe; +148 } +149 } +150 +151 /** +152 * Calculates the splits that will serve as input for the map tasks. The +153 * number of splits matches the number of regions in a table. +154 * +155 * @param context The current job context. +156 * @return The list of input splits. +157 * @throws IOException When creating the list of splits fails. +158 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext) +159 */ +160 @Override +161 public List<InputSplit> getSplits(JobContext context) throws IOException { +162 if (scans.isEmpty()) { +163 throw new IOException("No scans were provided."); +164 } +165 +166 Map<TableName, List<Scan>> tableMaps = new HashMap<TableName, List<Scan>>(); +167 for (Scan scan : scans) { +168 byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); +169 if (tableNameBytes == null) +170 throw new IOException("A scan object did not have a table name"); +171 +172 TableName tableName = TableName.valueOf(tableNameBytes); +173 +174 List<Scan> scanList = tableMaps.get(tableName); +175 if (scanList == null) { +176 scanList = new ArrayList<Scan>(); +177 tableMaps.put(tableName, scanList); +178 } +179 scanList.add(scan); +180 } +181 +182 List<InputSplit> splits = new ArrayList<InputSplit>(); +183 Iterator iter = tableMaps.entrySet().iterator(); +184 while (iter.hasNext()) { +185 Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next(); +186 TableName tableName = entry.getKey(); +187 List<Scan> scanList = entry.getValue(); +188 +189 try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration()); +190 Table table = conn.getTable(tableName); +191 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { +192 RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( +193 regionLocator, conn.getAdmin()); +194 Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); +195 for (Scan scan : scanList) { +196 if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { +197 throw new IOException("Expecting at least one region for table : " +198 + tableName.getNameAsString()); +199 } +200 int count = 0; +201 +202 byte[] startRow = scan.getStartRow(); +203 byte[] stopRow = scan.getStopRow(); +204 +205 for (int i = 0; i < keys.getFirst().length; i++) { +206 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { +207 continue; +208 } +209 +210 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || +211 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && +212 (stopRow.length == 0 || Bytes.compareTo(stopRow, +213 keys.getFirst()[i]) > 0)) { +214 byte[] splitStart = startRow.length == 0 || +215 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? +216 keys.getFirst()[i] : startRow; +217 byte[] splitStop = (stopRow.length == 0 || +218 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && +219 keys.getSecond()[i].length > 0 ? +220 keys.getSecond()[i] : stopRow; +221 +222 HRegionLocation hregionLocation = regionLocator.getRegionLocation( +223 keys.getFirst()[i], false); +224 String regionHostname = hregionLocation.getHostname(); +225 HRegionInfo regionInfo = hregionLocation.getRegionInfo(); +226 long regionSize = sizeCalculator.getRegionSize( +227 regionInfo.getRegionName()); +228 +229 TableSplit split = new TableSplit(table.getName(), +230 scan, splitStart, splitStop, regionHostname, regionSize); +231 +232 splits.add(split); +233 +234 if (LOG.isDebugEnabled()) +235 LOG.debug("getSplits: split -> " + (count++) + " -> " + split); +236 } +237 } +238 } +239 } +240 } +241 +242 return splits; +243 } +244 +245 /** +246 * Test if the given region is to be included in the InputSplit while +247 * splitting the regions of a table. +248 * <p> +249 * This optimization is effective when there is a specific reasoning to +250 * exclude an entire region from the M-R job, (and hence, not contributing to +251 * the InputSplit), given the start and end keys of the same. <br> +252 * Useful when we need to remember the last-processed top record and revisit +253 * the [last, current) interval for M-R processing, continuously. In addition +254 * to reducing InputSplits, reduces the load on the region server as well, due +255 * to the ordering of the keys. <br> +256 * <br> +257 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last +258 * (recent) region. <br> +259 * Override this method, if you want to bulk exclude regions altogether from +260 * M-R. By default, no region is excluded( i.e. all regions are included). +261 * +262 * @param startKey Start key of the region +263 * @param endKey End key of the region +264 * @return true, if this region needs to be included as part of the input +265 * (default). +266 */ +267 protected boolean includeRegionInSplit(final byte[] startKey, +268 final byte[] endKey) { +269 return true; +270 } +271 +272 /** +273 * Allows subclasses to get the list of {@link Scan} objects. +274 */ +275 protected List<Scan> getScans() { +276 return this.scans; +277 } +278 +279 /** +280 * Allows subclasses to set the list of {@link Scan} objects. +281 * +282 * @param scans The list of {@link Scan} used to define the input +283 */ +284 protected void setScans(List<Scan> scans) { +285 this.scans = scans; +286 } +287 +288 /** +289 * Allows subclasses to set the {@link TableRecordReader}. +290 * +291 * @param tableRecordReader A different {@link TableRecordReader} +292 * implementation. +293 */ +294 protected void setTableRecordReader(TableRecordReader tableRecordReader) { +295 this.tableRecordReader = tableRecordReader; +296 } +297} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/75eda567/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html ---------------------------------------------------------------------- diff --git a/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html b/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html index 79ac839..4c12385 100644 --- a/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html +++ b/apidocs/src-html/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html @@ -119,170 +119,172 @@ 111 * org.apache.hadoop.conf.Configuration) 112 */ 113 @Override -114 public void setConf(Configuration configuration) { -115 this.conf = configuration; -116 -117 Scan scan = null; +114 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION", +115 justification="Intentional") +116 public void setConf(Configuration configuration) { +117 this.conf = configuration; 118 -119 if (conf.get(SCAN) != null) { -120 try { -121 scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); -122 } catch (IOException e) { -123 LOG.error("An error occurred.", e); -124 } -125 } else { -126 try { -127 scan = new Scan(); -128 -129 if (conf.get(SCAN_ROW_START) != null) { -130 scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START))); -131 } -132 -133 if (conf.get(SCAN_ROW_STOP) != null) { -134 scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP))); -135 } -136 -137 if (conf.get(SCAN_COLUMNS) != null) { -138 addColumns(scan, conf.get(SCAN_COLUMNS)); -139 } -140 -141 if (conf.get(SCAN_COLUMN_FAMILY) != null) { -142 scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY))); -143 } -144 -145 if (conf.get(SCAN_TIMESTAMP) != null) { -146 scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); -147 } -148 -149 if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) { -150 scan.setTimeRange( -151 Long.parseLong(conf.get(SCAN_TIMERANGE_START)), -152 Long.parseLong(conf.get(SCAN_TIMERANGE_END))); -153 } -154 -155 if (conf.get(SCAN_MAXVERSIONS) != null) { -156 scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS))); -157 } -158 -159 if (conf.get(SCAN_CACHEDROWS) != null) { -160 scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); -161 } -162 -163 if (conf.get(SCAN_BATCHSIZE) != null) { -164 scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE))); -165 } -166 -167 // false by default, full table scans generate too much BC churn -168 scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false))); -169 } catch (Exception e) { -170 LOG.error(StringUtils.stringifyException(e)); -171 } -172 } -173 -174 setScan(scan); -175 } -176 -177 @Override -178 protected void initialize(JobContext context) throws IOException { -179 // Do we have to worry about mis-matches between the Configuration from setConf and the one -180 // in this context? -181 TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); -182 try { -183 initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); -184 } catch (Exception e) { -185 LOG.error(StringUtils.stringifyException(e)); -186 } -187 } -188 -189 /** -190 * Parses a combined family and qualifier and adds either both or just the -191 * family in case there is no qualifier. This assumes the older colon -192 * divided notation, e.g. "family:qualifier". -193 * -194 * @param scan The Scan to update. -195 * @param familyAndQualifier family and qualifier -196 * @throws IllegalArgumentException When familyAndQualifier is invalid. -197 */ -198 private static void addColumn(Scan scan, byte[] familyAndQualifier) { -199 byte [][] fq = KeyValue.parseColumn(familyAndQualifier); -200 if (fq.length == 1) { -201 scan.addFamily(fq[0]); -202 } else if (fq.length == 2) { -203 scan.addColumn(fq[0], fq[1]); -204 } else { -205 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); -206 } -207 } -208 -209 /** -210 * Adds an array of columns specified using old format, family:qualifier. -211 * <p> -212 * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the -213 * input. -214 * -215 * @param scan The Scan to update. -216 * @param columns array of columns, formatted as <code>family:qualifier</code> -217 * @see Scan#addColumn(byte[], byte[]) -218 */ -219 public static void addColumns(Scan scan, byte [][] columns) { -220 for (byte[] column : columns) { -221 addColumn(scan, column); -222 } -223 } -224 -225 /** -226 * Calculates the splits that will serve as input for the map tasks. The -227 * number of splits matches the number of regions in a table. Splits are shuffled if -228 * required. -229 * @param context The current job context. -230 * @return The list of input splits. -231 * @throws IOException When creating the list of splits fails. -232 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( -233 * org.apache.hadoop.mapreduce.JobContext) -234 */ -235 @Override -236 public List<InputSplit> getSplits(JobContext context) throws IOException { -237 List<InputSplit> splits = super.getSplits(context); -238 if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase())) { -239 Collections.shuffle(splits); -240 } -241 return splits; -242 } -243 -244 /** -245 * Convenience method to parse a string representation of an array of column specifiers. -246 * -247 * @param scan The Scan to update. -248 * @param columns The columns to parse. -249 */ -250 private static void addColumns(Scan scan, String columns) { -251 String[] cols = columns.split(" "); -252 for (String col : cols) { -253 addColumn(scan, Bytes.toBytes(col)); -254 } -255 } -256 -257 @Override -258 protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { -259 if (conf.get(SPLIT_TABLE) != null) { -260 TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE)); -261 try (Connection conn = ConnectionFactory.createConnection(getConf())) { -262 try (RegionLocator rl = conn.getRegionLocator(splitTableName)) { -263 return rl.getStartEndKeys(); -264 } -265 } -266 } -267 -268 return super.getStartEndKeys(); -269 } -270 -271 /** -272 * Sets split table in map-reduce job. -273 */ -274 public static void configureSplitTable(Job job, TableName tableName) { -275 job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString()); -276 } -277} +119 Scan scan = null; +120 +121 if (conf.get(SCAN) != null) { +122 try { +123 scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); +124 } catch (IOException e) { +125 LOG.error("An error occurred.", e); +126 } +127 } else { +128 try { +129 scan = new Scan(); +130 +131 if (conf.get(SCAN_ROW_START) != null) { +132 scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START))); +133 } +134 +135 if (conf.get(SCAN_ROW_STOP) != null) { +136 scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP))); +137 } +138 +139 if (conf.get(SCAN_COLUMNS) != null) { +140 addColumns(scan, conf.get(SCAN_COLUMNS)); +141 } +142 +143 if (conf.get(SCAN_COLUMN_FAMILY) != null) { +144 scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY))); +145 } +146 +147 if (conf.get(SCAN_TIMESTAMP) != null) { +148 scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); +149 } +150 +151 if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) { +152 scan.setTimeRange( +153 Long.parseLong(conf.get(SCAN_TIMERANGE_START)), +154 Long.parseLong(conf.get(SCAN_TIMERANGE_END))); +155 } +156 +157 if (conf.get(SCAN_MAXVERSIONS) != null) { +158 scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS))); +159 } +160 +161 if (conf.get(SCAN_CACHEDROWS) != null) { +162 scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); +163 } +164 +165 if (conf.get(SCAN_BATCHSIZE) != null) { +166 scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE))); +167 } +168 +169 // false by default, full table scans generate too much BC churn +170 scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false))); +171 } catch (Exception e) { +172 LOG.error(StringUtils.stringifyException(e)); +173 } +174 } +175 +176 setScan(scan); +177 } +178 +179 @Override +180 protected void initialize(JobContext context) throws IOException { +181 // Do we have to worry about mis-matches between the Configuration from setConf and the one +182 // in this context? +183 TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); +184 try { +185 initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); +186 } catch (Exception e) { +187 LOG.error(StringUtils.stringifyException(e)); +188 } +189 } +190 +191 /** +192 * Parses a combined family and qualifier and adds either both or just the +193 * family in case there is no qualifier. This assumes the older colon +194 * divided notation, e.g. "family:qualifier". +195 * +196 * @param scan The Scan to update. +197 * @param familyAndQualifier family and qualifier +198 * @throws IllegalArgumentException When familyAndQualifier is invalid. +199 */ +200 private static void addColumn(Scan scan, byte[] familyAndQualifier) { +201 byte [][] fq = KeyValue.parseColumn(familyAndQualifier); +202 if (fq.length == 1) { +203 scan.addFamily(fq[0]); +204 } else if (fq.length == 2) { +205 scan.addColumn(fq[0], fq[1]); +206 } else { +207 throw new IllegalArgumentException("Invalid familyAndQualifier provided."); +208 } +209 } +210 +211 /** +212 * Adds an array of columns specified using old format, family:qualifier. +213 * <p> +214 * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the +215 * input. +216 * +217 * @param scan The Scan to update. +218 * @param columns array of columns, formatted as <code>family:qualifier</code> +219 * @see Scan#addColumn(byte[], byte[]) +220 */ +221 public static void addColumns(Scan scan, byte [][] columns) { +222 for (byte[] column : columns) { +223 addColumn(scan, column); +224 } +225 } +226 +227 /** +228 * Calculates the splits that will serve as input for the map tasks. The +229 * number of splits matches the number of regions in a table. Splits are shuffled if +230 * required. +231 * @param context The current job context. +232 * @return The list of input splits. +233 * @throws IOException When creating the list of splits fails. +234 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( +235 * org.apache.hadoop.mapreduce.JobContext) +236 */ +237 @Override +238 public List<InputSplit> getSplits(JobContext context) throws IOException { +239 List<InputSplit> splits = super.getSplits(context); +240 if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase())) { +241 Collections.shuffle(splits); +242 } +243 return splits; +244 } +245 +246 /** +247 * Convenience method to parse a string representation of an array of column specifiers. +248 * +249 * @param scan The Scan to update. +250 * @param columns The columns to parse. +251 */ +252 private static void addColumns(Scan scan, String columns) { +253 String[] cols = columns.split(" "); +254 for (String col : cols) { +255 addColumn(scan, Bytes.toBytes(col)); +256 } +257 } +258 +259 @Override +260 protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { +261 if (conf.get(SPLIT_TABLE) != null) { +262 TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE)); +263 try (Connection conn = ConnectionFactory.createConnection(getConf())) { +264 try (RegionLocator rl = conn.getRegionLocator(splitTableName)) { +265 return rl.getStartEndKeys(); +266 } +267 } +268 } +269 +270 return super.getStartEndKeys(); +271 } +272 +273 /** +274 * Sets split table in map-reduce job. +275 */ +276 public static void configureSplitTable(Job job, TableName tableName) { +277 job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString()); +278 } +279}