Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 95FA42004F2 for ; Sat, 26 Aug 2017 10:55:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 94AC4167D7F; Sat, 26 Aug 2017 08:55:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 27B2F167D3E for ; Sat, 26 Aug 2017 10:55:56 +0200 (CEST) Received: (qmail 84518 invoked by uid 500); 26 Aug 2017 08:55:54 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 81136 invoked by uid 99); 26 Aug 2017 08:55:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Aug 2017 08:55:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 71DD4F5F44; Sat, 26 Aug 2017 08:55:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: appy@apache.org To: commits@hbase.apache.org Date: Sat, 26 Aug 2017 08:56:06 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/41] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module. archived-at: Sat, 26 Aug 2017 08:55:58 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/RowCounter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/RowCounter.java deleted file mode 100644 index 43560fd..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/RowCounter.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.IOException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * A job with a map to count rows. - * Map outputs table rows IF the input row has columns that have content. - * Uses a org.apache.hadoop.mapred.lib.IdentityReducer - */ -@InterfaceAudience.Public -public class RowCounter extends Configured implements Tool { - // Name of this 'program' - static final String NAME = "rowcounter"; - - /** - * Mapper that runs the count. - */ - static class RowCounterMapper - implements TableMap { - private static enum Counters {ROWS} - - public void map(ImmutableBytesWritable row, Result values, - OutputCollector output, - Reporter reporter) - throws IOException { - // Count every row containing data, whether it's in qualifiers or values - reporter.incrCounter(Counters.ROWS, 1); - } - - public void configure(JobConf jc) { - // Nothing to do. - } - - public void close() throws IOException { - // Nothing to do. - } - } - - /** - * @param args - * @return the JobConf - * @throws IOException - */ - public JobConf createSubmittableJob(String[] args) throws IOException { - JobConf c = new JobConf(getConf(), getClass()); - c.setJobName(NAME); - // Columns are space delimited - StringBuilder sb = new StringBuilder(); - final int columnoffset = 2; - for (int i = columnoffset; i < args.length; i++) { - if (i > columnoffset) { - sb.append(" "); - } - sb.append(args[i]); - } - // Second argument is the table name. - TableMapReduceUtil.initTableMapJob(args[1], sb.toString(), - RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, c); - c.setNumReduceTasks(0); - // First arg is the output directory. - FileOutputFormat.setOutputPath(c, new Path(args[0])); - return c; - } - - static int printUsage() { - System.out.println(NAME + - " [...]"); - return -1; - } - - public int run(final String[] args) throws Exception { - // Make sure there are at least 3 parameters - if (args.length < 3) { - System.err.println("ERROR: Wrong number of parameters: " + args.length); - return printUsage(); - } - JobClient.runJob(createSubmittableJob(args)); - return 0; - } - - /** - * @param args - * @throws Exception - */ - public static void main(String[] args) throws Exception { - int errCode = ToolRunner.run(HBaseConfiguration.create(), new RowCounter(), args); - System.exit(errCode); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java deleted file mode 100644 index 208849a..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; -import org.apache.hadoop.util.StringUtils; - -/** - * Convert HBase tabular data into a format that is consumable by Map/Reduce. - */ -@InterfaceAudience.Public -public class TableInputFormat extends TableInputFormatBase implements - JobConfigurable { - private static final Log LOG = LogFactory.getLog(TableInputFormat.class); - - /** - * space delimited list of columns - */ - public static final String COLUMN_LIST = "hbase.mapred.tablecolumns"; - - public void configure(JobConf job) { - try { - initialize(job); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } - } - - @Override - protected void initialize(JobConf job) throws IOException { - Path[] tableNames = FileInputFormat.getInputPaths(job); - String colArg = job.get(COLUMN_LIST); - String[] colNames = colArg.split(" "); - byte [][] m_cols = new byte[colNames.length][]; - for (int i = 0; i < m_cols.length; i++) { - m_cols[i] = Bytes.toBytes(colNames[i]); - } - setInputColumns(m_cols); - Connection connection = ConnectionFactory.createConnection(job); - initializeTable(connection, TableName.valueOf(tableNames[0].getName())); - } - - public void validateInput(JobConf job) throws IOException { - // expecting exactly one path - Path [] tableNames = FileInputFormat.getInputPaths(job); - if (tableNames == null || tableNames.length > 1) { - throw new IOException("expecting one table name"); - } - - // connected to table? - if (getTable() == null) { - throw new IOException("could not connect to table '" + - tableNames[0].getName() + "'"); - } - - // expecting at least one column - String colArg = job.get(COLUMN_LIST); - if (colArg == null || colArg.length() == 0) { - throw new IOException("expecting at least one column"); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java deleted file mode 100644 index c65810f..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java +++ /dev/null @@ -1,313 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; - -/** - * A Base for {@link TableInputFormat}s. Receives a {@link Table}, a - * byte[] of input columns and optionally a {@link Filter}. - * Subclasses may use other TableRecordReader implementations. - * - * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to - * function properly. Each of the entry points to this class used by the MapReduce framework, - * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, - * will call {@link #initialize(JobConf)} as a convenient centralized location to handle - * retrieving the necessary configuration information. If your subclass overrides either of these - * methods, either call the parent version or call initialize yourself. - * - *

- * An example of a subclass: - *

- *   class ExampleTIF extends TableInputFormatBase {
- *
- *     {@literal @}Override
- *     protected void initialize(JobConf context) throws IOException {
- *       // We are responsible for the lifecycle of this connection until we hand it over in
- *       // initializeTable.
- *       Connection connection =
- *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
- *       TableName tableName = TableName.valueOf("exampleTable");
- *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
- *       initializeTable(connection, tableName);
- *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
- *         Bytes.toBytes("columnB") };
- *       // mandatory
- *       setInputColumns(inputColumns);
- *       // optional, by default we'll get everything for the given columns.
- *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
- *       setRowFilter(exampleFilter);
- *     }
- *   }
- * 
- */ - -@InterfaceAudience.Public -public abstract class TableInputFormatBase -implements InputFormat { - private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); - private byte [][] inputColumns; - private Table table; - private RegionLocator regionLocator; - private Connection connection; - private TableRecordReader tableRecordReader; - private Filter rowFilter; - - private static final String NOT_INITIALIZED = "The input format instance has not been properly " + - "initialized. Ensure you call initializeTable either in your constructor or initialize " + - "method"; - private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + - " previous error. Please look at the previous logs lines from" + - " the task's full log for more details."; - - /** - * Builds a TableRecordReader. If no TableRecordReader was provided, uses - * the default. - * - * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, - * JobConf, Reporter) - */ - public RecordReader getRecordReader( - InputSplit split, JobConf job, Reporter reporter) - throws IOException { - // In case a subclass uses the deprecated approach or calls initializeTable directly - if (table == null) { - initialize(job); - } - // null check in case our child overrides getTable to not throw. - try { - if (getTable() == null) { - // initialize() must not have been implemented in the subclass. - throw new IOException(INITIALIZATION_ERROR); - } - } catch (IllegalStateException exception) { - throw new IOException(INITIALIZATION_ERROR, exception); - } - - TableSplit tSplit = (TableSplit) split; - // if no table record reader was provided use default - final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() : - this.tableRecordReader; - trr.setStartRow(tSplit.getStartRow()); - trr.setEndRow(tSplit.getEndRow()); - trr.setHTable(this.table); - trr.setInputColumns(this.inputColumns); - trr.setRowFilter(this.rowFilter); - trr.init(); - return new RecordReader() { - - @Override - public void close() throws IOException { - trr.close(); - closeTable(); - } - - @Override - public ImmutableBytesWritable createKey() { - return trr.createKey(); - } - - @Override - public Result createValue() { - return trr.createValue(); - } - - @Override - public long getPos() throws IOException { - return trr.getPos(); - } - - @Override - public float getProgress() throws IOException { - return trr.getProgress(); - } - - @Override - public boolean next(ImmutableBytesWritable key, Result value) throws IOException { - return trr.next(key, value); - } - }; - } - - /** - * Calculates the splits that will serve as input for the map tasks. - * - * Splits are created in number equal to the smallest between numSplits and - * the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table. - * If the number of splits is smaller than the number of - * {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits are spanned across - * multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s - * and are grouped the most evenly possible. In the - * case splits are uneven the bigger splits are placed first in the - * {@link InputSplit} array. - * - * @param job the map task {@link JobConf} - * @param numSplits a hint to calculate the number of splits (mapred.map.tasks). - * - * @return the input splits - * - * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int) - */ - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - if (this.table == null) { - initialize(job); - } - // null check in case our child overrides getTable to not throw. - try { - if (getTable() == null) { - // initialize() must not have been implemented in the subclass. - throw new IOException(INITIALIZATION_ERROR); - } - } catch (IllegalStateException exception) { - throw new IOException(INITIALIZATION_ERROR, exception); - } - - byte [][] startKeys = this.regionLocator.getStartKeys(); - if (startKeys == null || startKeys.length == 0) { - throw new IOException("Expecting at least one region"); - } - if (this.inputColumns == null || this.inputColumns.length == 0) { - throw new IOException("Expecting at least one column"); - } - int realNumSplits = numSplits > startKeys.length? startKeys.length: - numSplits; - InputSplit[] splits = new InputSplit[realNumSplits]; - int middle = startKeys.length / realNumSplits; - int startPos = 0; - for (int i = 0; i < realNumSplits; i++) { - int lastPos = startPos + middle; - lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; - String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]). - getHostname(); - splits[i] = new TableSplit(this.table.getName(), - startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]: - HConstants.EMPTY_START_ROW, regionLocation); - LOG.info("split: " + i + "->" + splits[i]); - startPos = lastPos; - } - return splits; - } - - /** - * Allows subclasses to initialize the table information. - * - * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. - * @param tableName The {@link TableName} of the table to process. - * @throws IOException - */ - protected void initializeTable(Connection connection, TableName tableName) throws IOException { - if (this.table != null || this.connection != null) { - LOG.warn("initializeTable called multiple times. Overwriting connection and table " + - "reference; TableInputFormatBase will not close these old references when done."); - } - this.table = connection.getTable(tableName); - this.regionLocator = connection.getRegionLocator(tableName); - this.connection = connection; - } - - /** - * @param inputColumns to be passed in {@link Result} to the map task. - */ - protected void setInputColumns(byte [][] inputColumns) { - this.inputColumns = inputColumns; - } - - /** - * Allows subclasses to get the {@link Table}. - */ - protected Table getTable() { - if (table == null) { - throw new IllegalStateException(NOT_INITIALIZED); - } - return this.table; - } - - /** - * Allows subclasses to set the {@link TableRecordReader}. - * - * @param tableRecordReader - * to provide other {@link TableRecordReader} implementations. - */ - protected void setTableRecordReader(TableRecordReader tableRecordReader) { - this.tableRecordReader = tableRecordReader; - } - - /** - * Allows subclasses to set the {@link Filter} to be used. - * - * @param rowFilter - */ - protected void setRowFilter(Filter rowFilter) { - this.rowFilter = rowFilter; - } - - /** - * Handle subclass specific set up. - * Each of the entry points used by the MapReduce framework, - * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, - * will call {@link #initialize(JobConf)} as a convenient centralized location to handle - * retrieving the necessary configuration information and calling - * {@link #initializeTable(Connection, TableName)}. - * - * Subclasses should implement their initialize call such that it is safe to call multiple times. - * The current TableInputFormatBase implementation relies on a non-null table reference to decide - * if an initialize call is needed, but this behavior may change in the future. In particular, - * it is critical that initializeTable not be called multiple times since this will leak - * Connection instances. - * - */ - protected void initialize(JobConf job) throws IOException { - } - - /** - * Close the Table and related objects that were initialized via - * {@link #initializeTable(Connection, TableName)}. - * - * @throws IOException - */ - protected void closeTable() throws IOException { - close(table, connection); - table = null; - connection = null; - } - - private void close(Closeable... closables) throws IOException { - for (Closeable c : closables) { - if(c != null) { c.close(); } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java deleted file mode 100644 index a9f1e61..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.Mapper; - -/** - * Scan an HBase table to sort by a specified sort column. - * If the column does not exist, the record is not passed to Reduce. - * - * @param WritableComparable key class - * @param Writable value class - */ -@InterfaceAudience.Public -public interface TableMap, V> -extends Mapper { - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java deleted file mode 100644 index 63ec418..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java +++ /dev/null @@ -1,376 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.MutationSerialization; -import org.apache.hadoop.hbase.mapreduce.ResultSerialization; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.security.token.TokenUtil; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputFormat; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; - -/** - * Utility for {@link TableMap} and {@link TableReduce} - */ -@InterfaceAudience.Public -@SuppressWarnings({ "rawtypes", "unchecked" }) -public class TableMapReduceUtil { - - /** - * Use this before submitting a TableMap job. It will - * appropriately set up the JobConf. - * - * @param table The table name to read from. - * @param columns The columns to scan. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job configuration to adjust. - */ - public static void initTableMapJob(String table, String columns, - Class mapper, - Class outputKeyClass, - Class outputValueClass, JobConf job) { - initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, - true, TableInputFormat.class); - } - - public static void initTableMapJob(String table, String columns, - Class mapper, - Class outputKeyClass, - Class outputValueClass, JobConf job, boolean addDependencyJars) { - initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, - addDependencyJars, TableInputFormat.class); - } - - /** - * Use this before submitting a TableMap job. It will - * appropriately set up the JobConf. - * - * @param table The table name to read from. - * @param columns The columns to scan. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job configuration to adjust. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - */ - public static void initTableMapJob(String table, String columns, - Class mapper, - Class outputKeyClass, - Class outputValueClass, JobConf job, boolean addDependencyJars, - Class inputFormat) { - - job.setInputFormat(inputFormat); - job.setMapOutputValueClass(outputValueClass); - job.setMapOutputKeyClass(outputKeyClass); - job.setMapperClass(mapper); - job.setStrings("io.serializations", job.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName()); - FileInputFormat.addInputPaths(job, table); - job.set(TableInputFormat.COLUMN_LIST, columns); - if (addDependencyJars) { - try { - addDependencyJars(job); - } catch (IOException e) { - e.printStackTrace(); - } - } - try { - initCredentials(job); - } catch (IOException ioe) { - // just spit out the stack trace? really? - ioe.printStackTrace(); - } - } - - /** - * Sets up the job for reading from one or more multiple table snapshots, with one or more scans - * per snapshot. - * It bypasses hbase servers and read directly from snapshot files. - * - * @param snapshotScans map of snapshot name to scans on that snapshot. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - */ - public static void initMultiTableSnapshotMapperJob(Map> snapshotScans, - Class mapper, Class outputKeyClass, Class outputValueClass, - JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { - MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir); - - job.setInputFormat(MultiTableSnapshotInputFormat.class); - if (outputValueClass != null) { - job.setMapOutputValueClass(outputValueClass); - } - if (outputKeyClass != null) { - job.setMapOutputKeyClass(outputKeyClass); - } - job.setMapperClass(mapper); - if (addDependencyJars) { - addDependencyJars(job); - } - - org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); - } - - /** - * Sets up the job for reading from a table snapshot. It bypasses hbase servers - * and read directly from snapshot files. - * - * @param snapshotName The name of the snapshot (of a table) to read from. - * @param columns The columns to scan. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should - * have write permissions to this directory, and this should not be a subdirectory of rootdir. - * After the job is finished, restore directory can be deleted. - * @throws IOException When setting up the details fails. - * @see TableSnapshotInputFormat - */ - public static void initTableSnapshotMapJob(String snapshotName, String columns, - Class mapper, - Class outputKeyClass, - Class outputValueClass, JobConf job, - boolean addDependencyJars, Path tmpRestoreDir) - throws IOException { - TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); - initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job, - addDependencyJars, TableSnapshotInputFormat.class); - org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); - } - - /** - * Use this before submitting a TableReduce job. It will - * appropriately set up the JobConf. - * - * @param table The output table. - * @param reducer The reducer class to use. - * @param job The current job configuration to adjust. - * @throws IOException When determining the region count fails. - */ - public static void initTableReduceJob(String table, - Class reducer, JobConf job) - throws IOException { - initTableReduceJob(table, reducer, job, null); - } - - /** - * Use this before submitting a TableReduce job. It will - * appropriately set up the JobConf. - * - * @param table The output table. - * @param reducer The reducer class to use. - * @param job The current job configuration to adjust. - * @param partitioner Partitioner to use. Pass null to use - * default partitioner. - * @throws IOException When determining the region count fails. - */ - public static void initTableReduceJob(String table, - Class reducer, JobConf job, Class partitioner) - throws IOException { - initTableReduceJob(table, reducer, job, partitioner, true); - } - - /** - * Use this before submitting a TableReduce job. It will - * appropriately set up the JobConf. - * - * @param table The output table. - * @param reducer The reducer class to use. - * @param job The current job configuration to adjust. - * @param partitioner Partitioner to use. Pass null to use - * default partitioner. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - * @throws IOException When determining the region count fails. - */ - public static void initTableReduceJob(String table, - Class reducer, JobConf job, Class partitioner, - boolean addDependencyJars) throws IOException { - job.setOutputFormat(TableOutputFormat.class); - job.setReducerClass(reducer); - job.set(TableOutputFormat.OUTPUT_TABLE, table); - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(Put.class); - job.setStrings("io.serializations", job.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName()); - if (partitioner == HRegionPartitioner.class) { - job.setPartitionerClass(HRegionPartitioner.class); - int regions = - MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); - if (job.getNumReduceTasks() > regions) { - job.setNumReduceTasks(regions); - } - } else if (partitioner != null) { - job.setPartitionerClass(partitioner); - } - if (addDependencyJars) { - addDependencyJars(job); - } - initCredentials(job); - } - - public static void initCredentials(JobConf job) throws IOException { - UserProvider userProvider = UserProvider.instantiate(job); - if (userProvider.isHadoopSecurityEnabled()) { - // propagate delegation related props from launcher job to MR job - if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { - job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); - } - } - - if (userProvider.isHBaseSecurityEnabled()) { - Connection conn = ConnectionFactory.createConnection(job); - try { - // login the server principal (if using secure Hadoop) - User user = userProvider.getCurrent(); - TokenUtil.addTokenForJob(conn, job, user); - } catch (InterruptedException ie) { - ie.printStackTrace(); - Thread.currentThread().interrupt(); - } finally { - conn.close(); - } - } - } - - /** - * Ensures that the given number of reduce tasks for the given job - * configuration does not exceed the number of regions for the given table. - * - * @param table The table to get the region count for. - * @param job The current job configuration to adjust. - * @throws IOException When retrieving the table details fails. - */ - // Used by tests. - public static void limitNumReduceTasks(String table, JobConf job) - throws IOException { - int regions = - MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); - if (job.getNumReduceTasks() > regions) - job.setNumReduceTasks(regions); - } - - /** - * Ensures that the given number of map tasks for the given job - * configuration does not exceed the number of regions for the given table. - * - * @param table The table to get the region count for. - * @param job The current job configuration to adjust. - * @throws IOException When retrieving the table details fails. - */ - // Used by tests. - public static void limitNumMapTasks(String table, JobConf job) - throws IOException { - int regions = - MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); - if (job.getNumMapTasks() > regions) - job.setNumMapTasks(regions); - } - - /** - * Sets the number of reduce tasks for the given job configuration to the - * number of regions the given table has. - * - * @param table The table to get the region count for. - * @param job The current job configuration to adjust. - * @throws IOException When retrieving the table details fails. - */ - public static void setNumReduceTasks(String table, JobConf job) - throws IOException { - job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), - TableName.valueOf(table))); - } - - /** - * Sets the number of map tasks for the given job configuration to the - * number of regions the given table has. - * - * @param table The table to get the region count for. - * @param job The current job configuration to adjust. - * @throws IOException When retrieving the table details fails. - */ - public static void setNumMapTasks(String table, JobConf job) - throws IOException { - job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), - TableName.valueOf(table))); - } - - /** - * Sets the number of rows to return and cache with each scanner iteration. - * Higher caching values will enable faster mapreduce jobs at the expense of - * requiring more heap to contain the cached rows. - * - * @param job The current job configuration to adjust. - * @param batchSize The number of rows to return in batch with each scanner - * iteration. - */ - public static void setScannerCaching(JobConf job, int batchSize) { - job.setInt("hbase.client.scanner.caching", batchSize); - } - - /** - * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) - */ - public static void addDependencyJars(JobConf job) throws IOException { - org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job); - org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses( - job, - // when making changes here, consider also mapreduce.TableMapReduceUtil - // pull job classes - job.getMapOutputKeyClass(), - job.getMapOutputValueClass(), - job.getOutputKeyClass(), - job.getOutputValueClass(), - job.getPartitionerClass(), - job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), - job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), - job.getCombinerClass()); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java deleted file mode 100644 index 8878eee..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.IOException; - -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.InvalidJobConfException; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.Progressable; - -/** - * Convert Map/Reduce output and write it to an HBase table - */ -@InterfaceAudience.Public -public class TableOutputFormat extends FileOutputFormat { - - /** JobConf parameter that specifies the output table */ - public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; - - /** - * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) - * and write to an HBase table. - */ - protected static class TableRecordWriter implements RecordWriter { - private BufferedMutator m_mutator; - private Connection conn; - - - /** - * Instantiate a TableRecordWriter with the HBase HClient for writing. - * - * @deprecated Please use {@code #TableRecordWriter(JobConf)} This version does not clean up - * connections and will leak connections (removed in 2.0) - */ - @Deprecated - public TableRecordWriter(final BufferedMutator mutator) throws IOException { - this.m_mutator = mutator; - this.conn = null; - } - - /** - * Instantiate a TableRecordWriter with a BufferedMutator for batch writing. - */ - public TableRecordWriter(JobConf job) throws IOException { - // expecting exactly one path - TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE)); - try { - this.conn = ConnectionFactory.createConnection(job); - this.m_mutator = conn.getBufferedMutator(tableName); - } finally { - if (this.m_mutator == null) { - conn.close(); - conn = null; - } - } - } - - public void close(Reporter reporter) throws IOException { - try { - if (this.m_mutator != null) { - this.m_mutator.close(); - } - } finally { - if (conn != null) { - this.conn.close(); - } - } - } - - public void write(ImmutableBytesWritable key, Put value) throws IOException { - m_mutator.mutate(new Put(value)); - } - } - - /** - * Creates a new record writer. - * - * Be aware that the baseline javadoc gives the impression that there is a single - * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new - * RecordWriter per call of this method. You must close the returned RecordWriter when done. - * Failure to do so will drop writes. - * - * @param ignored Ignored filesystem - * @param job Current JobConf - * @param name Name of the job - * @param progress - * @return The newly created writer instance. - * @throws IOException When creating the writer fails. - */ - @Override - public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) - throws IOException { - // Clear write buffer on fail is true by default so no need to reset it. - return new TableRecordWriter(job); - } - - @Override - public void checkOutputSpecs(FileSystem ignored, JobConf job) - throws FileAlreadyExistsException, InvalidJobConfException, IOException { - String tableName = job.get(OUTPUT_TABLE); - if (tableName == null) { - throw new IOException("Must specify table name"); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java deleted file mode 100644 index cecef7d..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.IOException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.RecordReader; - - -/** - * Iterate over an HBase table data, return (Text, RowResult) pairs - */ -@InterfaceAudience.Public -public class TableRecordReader -implements RecordReader { - - private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl(); - - /** - * Restart from survivable exceptions by creating a new scanner. - * - * @param firstRow - * @throws IOException - */ - public void restart(byte[] firstRow) throws IOException { - this.recordReaderImpl.restart(firstRow); - } - - /** - * Build the scanner. Not done in constructor to allow for extension. - * - * @throws IOException - */ - public void init() throws IOException { - this.recordReaderImpl.restart(this.recordReaderImpl.getStartRow()); - } - - /** - * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. - */ - public void setHTable(Table htable) { - this.recordReaderImpl.setHTable(htable); - } - - /** - * @param inputColumns the columns to be placed in {@link Result}. - */ - public void setInputColumns(final byte [][] inputColumns) { - this.recordReaderImpl.setInputColumns(inputColumns); - } - - /** - * @param startRow the first row in the split - */ - public void setStartRow(final byte [] startRow) { - this.recordReaderImpl.setStartRow(startRow); - } - - /** - * - * @param endRow the last row in the split - */ - public void setEndRow(final byte [] endRow) { - this.recordReaderImpl.setEndRow(endRow); - } - - /** - * @param rowFilter the {@link Filter} to be used. - */ - public void setRowFilter(Filter rowFilter) { - this.recordReaderImpl.setRowFilter(rowFilter); - } - - public void close() { - this.recordReaderImpl.close(); - } - - /** - * @return ImmutableBytesWritable - * - * @see org.apache.hadoop.mapred.RecordReader#createKey() - */ - public ImmutableBytesWritable createKey() { - return this.recordReaderImpl.createKey(); - } - - /** - * @return RowResult - * - * @see org.apache.hadoop.mapred.RecordReader#createValue() - */ - public Result createValue() { - return this.recordReaderImpl.createValue(); - } - - public long getPos() { - - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return this.recordReaderImpl.getPos(); - } - - public float getProgress() { - // Depends on the total number of tuples and getPos - return this.recordReaderImpl.getPos(); - } - - /** - * @param key HStoreKey as input key. - * @param value MapWritable as input value - * @return true if there was more data - * @throws IOException - */ - public boolean next(ImmutableBytesWritable key, Result value) - throws IOException { - return this.recordReaderImpl.next(key, value); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java deleted file mode 100644 index f6b79c3..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java +++ /dev/null @@ -1,259 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; - -import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; - -/** - * Iterate over an HBase table data, return (Text, RowResult) pairs - */ -@InterfaceAudience.Public -public class TableRecordReaderImpl { - private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class); - - private byte [] startRow; - private byte [] endRow; - private byte [] lastSuccessfulRow; - private Filter trrRowFilter; - private ResultScanner scanner; - private Table htable; - private byte [][] trrInputColumns; - private long timestamp; - private int rowcount; - private boolean logScannerActivity = false; - private int logPerRowCount = 100; - - /** - * Restart from survivable exceptions by creating a new scanner. - * - * @param firstRow - * @throws IOException - */ - public void restart(byte[] firstRow) throws IOException { - Scan currentScan; - if ((endRow != null) && (endRow.length > 0)) { - if (trrRowFilter != null) { - Scan scan = new Scan(firstRow, endRow); - TableInputFormat.addColumns(scan, trrInputColumns); - scan.setFilter(trrRowFilter); - scan.setCacheBlocks(false); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } else { - LOG.debug("TIFB.restart, firstRow: " + - Bytes.toStringBinary(firstRow) + ", endRow: " + - Bytes.toStringBinary(endRow)); - Scan scan = new Scan(firstRow, endRow); - TableInputFormat.addColumns(scan, trrInputColumns); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } - } else { - LOG.debug("TIFB.restart, firstRow: " + - Bytes.toStringBinary(firstRow) + ", no endRow"); - - Scan scan = new Scan(firstRow); - TableInputFormat.addColumns(scan, trrInputColumns); - scan.setFilter(trrRowFilter); - this.scanner = this.htable.getScanner(scan); - currentScan = scan; - } - if (logScannerActivity) { - LOG.info("Current scan=" + currentScan.toString()); - timestamp = System.currentTimeMillis(); - rowcount = 0; - } - } - - /** - * Build the scanner. Not done in constructor to allow for extension. - * - * @throws IOException - */ - public void init() throws IOException { - restart(startRow); - } - - byte[] getStartRow() { - return this.startRow; - } - /** - * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. - */ - public void setHTable(Table htable) { - Configuration conf = htable.getConfiguration(); - logScannerActivity = conf.getBoolean( - ScannerCallable.LOG_SCANNER_ACTIVITY, false); - logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); - this.htable = htable; - } - - /** - * @param inputColumns the columns to be placed in {@link Result}. - */ - public void setInputColumns(final byte [][] inputColumns) { - this.trrInputColumns = inputColumns; - } - - /** - * @param startRow the first row in the split - */ - public void setStartRow(final byte [] startRow) { - this.startRow = startRow; - } - - /** - * - * @param endRow the last row in the split - */ - public void setEndRow(final byte [] endRow) { - this.endRow = endRow; - } - - /** - * @param rowFilter the {@link Filter} to be used. - */ - public void setRowFilter(Filter rowFilter) { - this.trrRowFilter = rowFilter; - } - - public void close() { - if (this.scanner != null) { - this.scanner.close(); - } - try { - this.htable.close(); - } catch (IOException ioe) { - LOG.warn("Error closing table", ioe); - } - } - - /** - * @return ImmutableBytesWritable - * - * @see org.apache.hadoop.mapred.RecordReader#createKey() - */ - public ImmutableBytesWritable createKey() { - return new ImmutableBytesWritable(); - } - - /** - * @return RowResult - * - * @see org.apache.hadoop.mapred.RecordReader#createValue() - */ - public Result createValue() { - return new Result(); - } - - public long getPos() { - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return 0; - } - - public float getProgress() { - // Depends on the total number of tuples and getPos - return 0; - } - - /** - * @param key HStoreKey as input key. - * @param value MapWritable as input value - * @return true if there was more data - * @throws IOException - */ - public boolean next(ImmutableBytesWritable key, Result value) - throws IOException { - Result result; - try { - try { - result = this.scanner.next(); - if (logScannerActivity) { - rowcount ++; - if (rowcount >= logPerRowCount) { - long now = System.currentTimeMillis(); - LOG.info("Mapper took " + (now-timestamp) - + "ms to process " + rowcount + " rows"); - timestamp = now; - rowcount = 0; - } - } - } catch (IOException e) { - // do not retry if the exception tells us not to do so - if (e instanceof DoNotRetryIOException) { - throw e; - } - // try to handle all other IOExceptions by restarting - // the scanner, if the second call fails, it will be rethrown - LOG.debug("recovered from " + StringUtils.stringifyException(e)); - if (lastSuccessfulRow == null) { - LOG.warn("We are restarting the first next() invocation," + - " if your mapper has restarted a few other times like this" + - " then you should consider killing this job and investigate" + - " why it's taking so long."); - } - if (lastSuccessfulRow == null) { - restart(startRow); - } else { - restart(lastSuccessfulRow); - this.scanner.next(); // skip presumed already mapped row - } - result = this.scanner.next(); - } - - if (result != null && result.size() > 0) { - key.set(result.getRow()); - lastSuccessfulRow = key.get(); - value.copyFrom(result); - return true; - } - return false; - } catch (IOException ioe) { - if (logScannerActivity) { - long now = System.currentTimeMillis(); - LOG.info("Mapper took " + (now-timestamp) - + "ms to process " + rowcount + " rows"); - LOG.info(ioe); - String lastRow = lastSuccessfulRow == null ? - "null" : Bytes.toStringBinary(lastSuccessfulRow); - LOG.info("lastSuccessfulRow=" + lastRow); - } - throw ioe; - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java deleted file mode 100644 index 91fb4a1..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.Reducer; - -/** - * Write a table, sorting by the input key - * - * @param key class - * @param value class - */ -@InterfaceAudience.Public -@SuppressWarnings("unchecked") -public interface TableReduce -extends Reducer { - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java deleted file mode 100644 index d7b49ff..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapred; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; - -/** - * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. Further - * documentation available on {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}. - * - * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat - */ -@InterfaceAudience.Public -public class TableSnapshotInputFormat implements InputFormat { - - public static class TableSnapshotRegionSplit implements InputSplit { - private TableSnapshotInputFormatImpl.InputSplit delegate; - - // constructor for mapreduce framework / Writable - public TableSnapshotRegionSplit() { - this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); - } - - public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { - this.delegate = delegate; - } - - public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, - List locations, Scan scan, Path restoreDir) { - this.delegate = - new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); - } - - @Override - public long getLength() throws IOException { - return delegate.getLength(); - } - - @Override - public String[] getLocations() throws IOException { - return delegate.getLocations(); - } - - @Override - public void write(DataOutput out) throws IOException { - delegate.write(out); - } - - @Override - public void readFields(DataInput in) throws IOException { - delegate.readFields(in); - } - } - - static class TableSnapshotRecordReader - implements RecordReader { - - private TableSnapshotInputFormatImpl.RecordReader delegate; - - public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job) - throws IOException { - delegate = new TableSnapshotInputFormatImpl.RecordReader(); - delegate.initialize(split.delegate, job); - } - - @Override - public boolean next(ImmutableBytesWritable key, Result value) throws IOException { - if (!delegate.nextKeyValue()) { - return false; - } - ImmutableBytesWritable currentKey = delegate.getCurrentKey(); - key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength()); - value.copyFrom(delegate.getCurrentValue()); - return true; - } - - @Override - public ImmutableBytesWritable createKey() { - return new ImmutableBytesWritable(); - } - - @Override - public Result createValue() { - return new Result(); - } - - @Override - public long getPos() throws IOException { - return delegate.getPos(); - } - - @Override - public void close() throws IOException { - delegate.close(); - } - - @Override - public float getProgress() throws IOException { - return delegate.getProgress(); - } - } - - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - List splits = - TableSnapshotInputFormatImpl.getSplits(job); - InputSplit[] results = new InputSplit[splits.size()]; - for (int i = 0; i < splits.size(); i++) { - results[i] = new TableSnapshotRegionSplit(splits.get(i)); - } - return results; - } - - @Override - public RecordReader - getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { - return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); - } - - /** - * Configures the job to use TableSnapshotInputFormat to read from a snapshot. - * @param job the job to configure - * @param snapshotName the name of the snapshot to read from - * @param restoreDir a temporary directory to restore the snapshot into. Current user should - * have write permissions to this directory, and this should not be a subdirectory of rootdir. - * After the job is finished, restoreDir can be deleted. - * @throws IOException if an error occurs - */ - public static void setInput(JobConf job, String snapshotName, Path restoreDir) - throws IOException { - TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java deleted file mode 100644 index 0784e5e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Arrays; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.InputSplit; - -/** - * A table split corresponds to a key range [low, high) - */ -@InterfaceAudience.Public -public class TableSplit implements InputSplit, Comparable { - private TableName m_tableName; - private byte [] m_startRow; - private byte [] m_endRow; - private String m_regionLocation; - - /** default constructor */ - public TableSplit() { - this((TableName)null, HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY, ""); - } - - /** - * Constructor - * @param tableName - * @param startRow - * @param endRow - * @param location - */ - public TableSplit(TableName tableName, byte [] startRow, byte [] endRow, - final String location) { - this.m_tableName = tableName; - this.m_startRow = startRow; - this.m_endRow = endRow; - this.m_regionLocation = location; - } - - public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow, - final String location) { - this(TableName.valueOf(tableName), startRow, endRow, - location); - } - - /** @return table name */ - public TableName getTable() { - return this.m_tableName; - } - - /** @return table name */ - public byte [] getTableName() { - return this.m_tableName.getName(); - } - - /** @return starting row key */ - public byte [] getStartRow() { - return this.m_startRow; - } - - /** @return end row key */ - public byte [] getEndRow() { - return this.m_endRow; - } - - /** @return the region's hostname */ - public String getRegionLocation() { - return this.m_regionLocation; - } - - public String[] getLocations() { - return new String[] {this.m_regionLocation}; - } - - public long getLength() { - // Not clear how to obtain this... seems to be used only for sorting splits - return 0; - } - - public void readFields(DataInput in) throws IOException { - this.m_tableName = TableName.valueOf(Bytes.readByteArray(in)); - this.m_startRow = Bytes.readByteArray(in); - this.m_endRow = Bytes.readByteArray(in); - this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); - } - - public void write(DataOutput out) throws IOException { - Bytes.writeByteArray(out, this.m_tableName.getName()); - Bytes.writeByteArray(out, this.m_startRow); - Bytes.writeByteArray(out, this.m_endRow); - Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("HBase table split("); - sb.append("table name: ").append(m_tableName); - sb.append(", start row: ").append(Bytes.toStringBinary(m_startRow)); - sb.append(", end row: ").append(Bytes.toStringBinary(m_endRow)); - sb.append(", region location: ").append(m_regionLocation); - sb.append(")"); - return sb.toString(); - } - - @Override - public int compareTo(TableSplit o) { - return Bytes.compareTo(getStartRow(), o.getStartRow()); - } - - @Override - public boolean equals(Object o) { - if (o == null || !(o instanceof TableSplit)) { - return false; - } - TableSplit other = (TableSplit)o; - return m_tableName.equals(other.m_tableName) && - Bytes.equals(m_startRow, other.m_startRow) && - Bytes.equals(m_endRow, other.m_endRow) && - m_regionLocation.equals(other.m_regionLocation); - } - - @Override - public int hashCode() { - int result = m_tableName != null ? m_tableName.hashCode() : 0; - result = 31 * result + Arrays.hashCode(m_startRow); - result = 31 * result + Arrays.hashCode(m_endRow); - result = 31 * result + (m_regionLocation != null ? m_regionLocation.hashCode() : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java deleted file mode 100644 index 8a2a363..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** -Provides HBase MapReduce -Input/OutputFormats, a table indexing MapReduce job, and utility methods. - -

See HBase and MapReduce -in the HBase Reference Guide for mapreduce over hbase documentation. -*/ -package org.apache.hadoop.hbase.mapred; http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java deleted file mode 100644 index 078033e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java +++ /dev/null @@ -1,333 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.PrefixFilter; -import org.apache.hadoop.hbase.filter.RegexStringComparator; -import org.apache.hadoop.hbase.filter.RowFilter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; - - -/** - * A job with a a map and reduce phase to count cells in a table. - * The counter lists the following stats for a given table: - *

- * 1. Total number of rows in the table
- * 2. Total number of CFs across all rows
- * 3. Total qualifiers across all rows
- * 4. Total occurrence of each CF
- * 5. Total occurrence  of each qualifier
- * 6. Total number of versions of each qualifier.
- * 
- * - * The cellcounter can take optional parameters to use a user - * supplied row/family/qualifier string to use in the report and - * second a regex based or prefix based row filter to restrict the - * count operation to a limited subset of rows from the table or a - * start time and/or end time to limit the count to a time range. - */ -@InterfaceAudience.Public -public class CellCounter extends Configured implements Tool { - private static final Log LOG = - LogFactory.getLog(CellCounter.class.getName()); - - - /** - * Name of this 'program'. - */ - static final String NAME = "CellCounter"; - - private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - - /** - * Mapper that runs the count. - */ - static class CellCounterMapper - extends TableMapper { - /** - * Counter enumeration to count the actual rows. - */ - public static enum Counters { - ROWS, - CELLS - } - - private Configuration conf; - private String separator; - - // state of current row, family, column needs to persist across map() invocations - // in order to properly handle scanner batching, where a single qualifier may have too - // many versions for a single map() call - private byte[] lastRow; - private String currentRowKey; - byte[] currentFamily = null; - String currentFamilyName = null; - byte[] currentQualifier = null; - // family + qualifier - String currentQualifierName = null; - // rowkey + family + qualifier - String currentRowQualifierName = null; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - conf = context.getConfiguration(); - separator = conf.get("ReportSeparator",":"); - } - - /** - * Maps the data. - * - * @param row The current table row key. - * @param values The columns. - * @param context The current context. - * @throws IOException When something is broken with the data. - * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, - * org.apache.hadoop.mapreduce.Mapper.Context) - */ - - @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", - justification="Findbugs is blind to the Precondition null check") - public void map(ImmutableBytesWritable row, Result values, - Context context) - throws IOException { - Preconditions.checkState(values != null, - "values passed to the map is null"); - - try { - byte[] currentRow = values.getRow(); - if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { - lastRow = currentRow; - currentRowKey = Bytes.toStringBinary(currentRow); - currentFamily = null; - currentQualifier = null; - context.getCounter(Counters.ROWS).increment(1); - context.write(new Text("Total ROWS"), new IntWritable(1)); - } - if (!values.isEmpty()) { - int cellCount = 0; - for (Cell value : values.listCells()) { - cellCount++; - if (currentFamily == null || !CellUtil.matchingFamily(value, currentFamily)) { - currentFamily = CellUtil.cloneFamily(value); - currentFamilyName = Bytes.toStringBinary(currentFamily); - currentQualifier = null; - context.getCounter("CF", currentFamilyName).increment(1); - if (1 == context.getCounter("CF", currentFamilyName).getValue()) { - context.write(new Text("Total Families Across all Rows"), new IntWritable(1)); - context.write(new Text(currentFamily), new IntWritable(1)); - } - } - if (currentQualifier == null || !CellUtil.matchingQualifier(value, currentQualifier)) { - currentQualifier = CellUtil.cloneQualifier(value); - currentQualifierName = currentFamilyName + separator + - Bytes.toStringBinary(currentQualifier); - currentRowQualifierName = currentRowKey + separator + currentQualifierName; - - context.write(new Text("Total Qualifiers across all Rows"), - new IntWritable(1)); - context.write(new Text(currentQualifierName), new IntWritable(1)); - } - // Increment versions - context.write(new Text(currentRowQualifierName + "_Versions"), new IntWritable(1)); - } - context.getCounter(Counters.CELLS).increment(cellCount); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - static class IntSumReducer extends Reducer { - - private IntWritable result = new IntWritable(); - public void reduce(Key key, Iterable values, - Context context) - throws IOException, InterruptedException { - int sum = 0; - for (IntWritable val : values) { - sum += val.get(); - } - result.set(sum); - context.write(key, result); - } - } - - /** - * Sets up the actual job. - * - * @param conf The current configuration. - * @param args The command line parameters. - * @return The newly created job. - * @throws IOException When setting up the job fails. - */ - public static Job createSubmittableJob(Configuration conf, String[] args) - throws IOException { - String tableName = args[0]; - Path outputDir = new Path(args[1]); - String reportSeparatorString = (args.length > 2) ? args[2]: ":"; - conf.set("ReportSeparator", reportSeparatorString); - Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); - job.setJarByClass(CellCounter.class); - Scan scan = getConfiguredScanForJob(conf, args); - TableMapReduceUtil.initTableMapperJob(tableName, scan, - CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); - job.setNumReduceTasks(1); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(IntWritable.class); - job.setOutputFormatClass(TextOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - FileOutputFormat.setOutputPath(job, outputDir); - job.setReducerClass(IntSumReducer.class); - return job; - } - - private static Scan getConfiguredScanForJob(Configuration conf, String[] args) - throws IOException { - // create scan with any properties set from TableInputFormat - Scan s = TableInputFormat.createScanFromConfiguration(conf); - // Set Scan Versions - if (conf.get(TableInputFormat.SCAN_MAXVERSIONS) == null) { - // default to all versions unless explicitly set - s.setMaxVersions(Integer.MAX_VALUE); - } - s.setCacheBlocks(false); - // Set RowFilter or Prefix Filter if applicable. - Filter rowFilter = getRowFilter(args); - if (rowFilter!= null) { - LOG.info("Setting Row Filter for counter."); - s.setFilter(rowFilter); - } - // Set TimeRange if defined - long timeRange[] = getTimeRange(args); - if (timeRange != null) { - LOG.info("Setting TimeRange for counter."); - s.setTimeRange(timeRange[0], timeRange[1]); - } - return s; - } - - - private static Filter getRowFilter(String[] args) { - Filter rowFilter = null; - String filterCriteria = (args.length > 3) ? args[3]: null; - if (filterCriteria == null) return null; - if (filterCriteria.startsWith("^")) { - String regexPattern = filterCriteria.substring(1, filterCriteria.length()); - rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern)); - } else { - rowFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria)); - } - return rowFilter; - } - - private static long[] getTimeRange(String[] args) throws IOException { - final String startTimeArgKey = "--starttime="; - final String endTimeArgKey = "--endtime="; - long startTime = 0L; - long endTime = 0L; - - for (int i = 1; i < args.length; i++) { - System.out.println("i:" + i + "arg[i]" + args[i]); - if (args[i].startsWith(startTimeArgKey)) { - startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); - } - if (args[i].startsWith(endTimeArgKey)) { - endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); - } - } - - if (startTime == 0 && endTime == 0) - return null; - - endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime; - return new long [] {startTime, endTime}; - } - - @Override - public int run(String[] args) throws Exception { - if (args.length < 2) { - System.err.println("ERROR: Wrong number of parameters: " + args.length); - System.err.println("Usage: CellCounter "); - System.err.println(" [^[regex pattern] or " + - "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]"); - System.err.println(" Note: -D properties will be applied to the conf used. "); - System.err.println(" Additionally, all of the SCAN properties from TableInputFormat"); - System.err.println(" can be specified to get fine grained control on what is counted.."); - System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "="); - System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "="); - System.err.println(" -D " + TableInputFormat.SCAN_COLUMNS + "=\" ...\""); - System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=,, ..."); - System.err.println(" -D " + TableInputFormat.SCAN_TIMESTAMP + "="); - System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_START + "="); - System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_END + "="); - System.err.println(" -D " + TableInputFormat.SCAN_MAXVERSIONS + "="); - System.err.println(" -D " + TableInputFormat.SCAN_CACHEDROWS + "="); - System.err.println(" -D " + TableInputFormat.SCAN_BATCHSIZE + "="); - System.err.println(" parameter can be used to override the default report separator " + - "string : used to separate the rowId/column family name and qualifier name."); - System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " + - "operation to a limited subset of rows from the table based on regex or prefix pattern."); - return -1; - } - Job job = createSubmittableJob(getConf(), args); - return (job.waitForCompletion(true) ? 0 : 1); - } - - /** - * Main entry point. - * @param args The command line parameters. - * @throws Exception When running the job fails. - */ - public static void main(String[] args) throws Exception { - int errCode = ToolRunner.run(HBaseConfiguration.create(), new CellCounter(), args); - System.exit(errCode); - } - -}