Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DD4B5200D00 for ; Sat, 26 Aug 2017 10:55:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DBBD1167D12; Sat, 26 Aug 2017 08:55:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 95698167CC0 for ; Sat, 26 Aug 2017 10:55:52 +0200 (CEST) Received: (qmail 81150 invoked by uid 500); 26 Aug 2017 08:55:51 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 80349 invoked by uid 99); 26 Aug 2017 08:55:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Aug 2017 08:55:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 41B26F5F4D; Sat, 26 Aug 2017 08:55:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: appy@apache.org To: commits@hbase.apache.org Date: Sat, 26 Aug 2017 08:56:02 -0000 Message-Id: <7b037e6a5c80433c843df740e04119d5@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/41] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module. archived-at: Sat, 26 Aug 2017 08:55:55 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java deleted file mode 100644 index e18b3aa..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java +++ /dev/null @@ -1,297 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.RegionSizeCalculator; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.util.Map; -import java.util.HashMap; -import java.util.Iterator; -/** - * A base for {@link MultiTableInputFormat}s. Receives a list of - * {@link Scan} instances that define the input tables and - * filters etc. Subclasses may use other TableRecordReader implementations. - */ -@InterfaceAudience.Public -public abstract class MultiTableInputFormatBase extends - InputFormat { - - private static final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class); - - /** Holds the set of scans used to define the input. */ - private List scans; - - /** The reader scanning the table, can be a custom one. */ - private TableRecordReader tableRecordReader = null; - - /** - * Builds a TableRecordReader. If no TableRecordReader was provided, uses the - * default. - * - * @param split The split to work with. - * @param context The current context. - * @return The newly created record reader. - * @throws IOException When creating the reader fails. - * @throws InterruptedException when record reader initialization fails - * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( - * org.apache.hadoop.mapreduce.InputSplit, - * org.apache.hadoop.mapreduce.TaskAttemptContext) - */ - @Override - public RecordReader createRecordReader( - InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - TableSplit tSplit = (TableSplit) split; - LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength())); - - if (tSplit.getTable() == null) { - throw new IOException("Cannot create a record reader because of a" - + " previous error. Please look at the previous logs lines from" - + " the task's full log for more details."); - } - final Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); - Table table = connection.getTable(tSplit.getTable()); - - if (this.tableRecordReader == null) { - this.tableRecordReader = new TableRecordReader(); - } - final TableRecordReader trr = this.tableRecordReader; - - try { - Scan sc = tSplit.getScan(); - sc.setStartRow(tSplit.getStartRow()); - sc.setStopRow(tSplit.getEndRow()); - trr.setScan(sc); - trr.setTable(table); - return new RecordReader() { - - @Override - public void close() throws IOException { - trr.close(); - connection.close(); - } - - @Override - public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { - return trr.getCurrentKey(); - } - - @Override - public Result getCurrentValue() throws IOException, InterruptedException { - return trr.getCurrentValue(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return trr.getProgress(); - } - - @Override - public void initialize(InputSplit inputsplit, TaskAttemptContext context) - throws IOException, InterruptedException { - trr.initialize(inputsplit, context); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return trr.nextKeyValue(); - } - }; - } catch (IOException ioe) { - // If there is an exception make sure that all - // resources are closed and released. - trr.close(); - connection.close(); - throw ioe; - } - } - - /** - * Calculates the splits that will serve as input for the map tasks. The - * number of splits matches the number of regions in a table. - * - * @param context The current job context. - * @return The list of input splits. - * @throws IOException When creating the list of splits fails. - * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext) - */ - @Override - public List getSplits(JobContext context) throws IOException { - if (scans.isEmpty()) { - throw new IOException("No scans were provided."); - } - - Map> tableMaps = new HashMap<>(); - for (Scan scan : scans) { - byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); - if (tableNameBytes == null) - throw new IOException("A scan object did not have a table name"); - - TableName tableName = TableName.valueOf(tableNameBytes); - - List scanList = tableMaps.get(tableName); - if (scanList == null) { - scanList = new ArrayList<>(); - tableMaps.put(tableName, scanList); - } - scanList.add(scan); - } - - List splits = new ArrayList<>(); - Iterator iter = tableMaps.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry> entry = (Map.Entry>) iter.next(); - TableName tableName = entry.getKey(); - List scanList = entry.getValue(); - - try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration()); - Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName)) { - RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( - regionLocator, conn.getAdmin()); - Pair keys = regionLocator.getStartEndKeys(); - for (Scan scan : scanList) { - if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - throw new IOException("Expecting at least one region for table : " - + tableName.getNameAsString()); - } - int count = 0; - - byte[] startRow = scan.getStartRow(); - byte[] stopRow = scan.getStopRow(); - - for (int i = 0; i < keys.getFirst().length; i++) { - if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { - continue; - } - - if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || - Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && - (stopRow.length == 0 || Bytes.compareTo(stopRow, - keys.getFirst()[i]) > 0)) { - byte[] splitStart = startRow.length == 0 || - Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? - keys.getFirst()[i] : startRow; - byte[] splitStop = (stopRow.length == 0 || - Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && - keys.getSecond()[i].length > 0 ? - keys.getSecond()[i] : stopRow; - - HRegionLocation hregionLocation = regionLocator.getRegionLocation( - keys.getFirst()[i], false); - String regionHostname = hregionLocation.getHostname(); - HRegionInfo regionInfo = hregionLocation.getRegionInfo(); - String encodedRegionName = regionInfo.getEncodedName(); - long regionSize = sizeCalculator.getRegionSize( - regionInfo.getRegionName()); - - TableSplit split = new TableSplit(table.getName(), - scan, splitStart, splitStop, regionHostname, - encodedRegionName, regionSize); - - splits.add(split); - - if (LOG.isDebugEnabled()) - LOG.debug("getSplits: split -> " + (count++) + " -> " + split); - } - } - } - } - } - - return splits; - } - - /** - * Test if the given region is to be included in the InputSplit while - * splitting the regions of a table. - *

- * This optimization is effective when there is a specific reasoning to - * exclude an entire region from the M-R job, (and hence, not contributing to - * the InputSplit), given the start and end keys of the same.
- * Useful when we need to remember the last-processed top record and revisit - * the [last, current) interval for M-R processing, continuously. In addition - * to reducing InputSplits, reduces the load on the region server as well, due - * to the ordering of the keys.
- *
- * Note: It is possible that endKey.length() == 0 , for the last - * (recent) region.
- * Override this method, if you want to bulk exclude regions altogether from - * M-R. By default, no region is excluded( i.e. all regions are included). - * - * @param startKey Start key of the region - * @param endKey End key of the region - * @return true, if this region needs to be included as part of the input - * (default). - */ - protected boolean includeRegionInSplit(final byte[] startKey, - final byte[] endKey) { - return true; - } - - /** - * Allows subclasses to get the list of {@link Scan} objects. - */ - protected List getScans() { - return this.scans; - } - - /** - * Allows subclasses to set the list of {@link Scan} objects. - * - * @param scans The list of {@link Scan} used to define the input - */ - protected void setScans(List scans) { - this.scans = scans; - } - - /** - * Allows subclasses to set the {@link TableRecordReader}. - * - * @param tableRecordReader A different {@link TableRecordReader} - * implementation. - */ - protected void setTableRecordReader(TableRecordReader tableRecordReader) { - this.tableRecordReader = tableRecordReader; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java deleted file mode 100644 index 4cc784f..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - *

- * Hadoop output format that writes to one or more HBase tables. The key is - * taken to be the table name while the output value must be either a - * {@link Put} or a {@link Delete} instance. All tables must already exist, and - * all Puts and Deletes must reference only valid column families. - *

- * - *

- * Write-ahead logging (WAL) for Puts can be disabled by setting - * {@link #WAL_PROPERTY} to {@link #WAL_OFF}. Default value is {@link #WAL_ON}. - * Note that disabling write-ahead logging is only appropriate for jobs where - * loss of data due to region server failure can be tolerated (for example, - * because it is easy to rerun a bulk import). - *

- */ -@InterfaceAudience.Public -public class MultiTableOutputFormat extends OutputFormat { - /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */ - public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal"; - /** Property value to use write-ahead logging */ - public static final boolean WAL_ON = true; - /** Property value to disable write-ahead logging */ - public static final boolean WAL_OFF = false; - /** - * Record writer for outputting to multiple HTables. - */ - protected static class MultiTableRecordWriter extends - RecordWriter { - private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class); - Connection connection; - Map mutatorMap = new HashMap<>(); - Configuration conf; - boolean useWriteAheadLogging; - - /** - * @param conf - * HBaseConfiguration to used - * @param useWriteAheadLogging - * whether to use write ahead logging. This can be turned off ( - * false) to improve performance when bulk loading data. - */ - public MultiTableRecordWriter(Configuration conf, - boolean useWriteAheadLogging) throws IOException { - LOG.debug("Created new MultiTableRecordReader with WAL " - + (useWriteAheadLogging ? "on" : "off")); - this.conf = conf; - this.useWriteAheadLogging = useWriteAheadLogging; - } - - /** - * @param tableName - * the name of the table, as a string - * @return the named mutator - * @throws IOException - * if there is a problem opening a table - */ - BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException { - if(this.connection == null){ - this.connection = ConnectionFactory.createConnection(conf); - } - if (!mutatorMap.containsKey(tableName)) { - LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing"); - - BufferedMutator mutator = - connection.getBufferedMutator(TableName.valueOf(tableName.get())); - mutatorMap.put(tableName, mutator); - } - return mutatorMap.get(tableName); - } - - @Override - public void close(TaskAttemptContext context) throws IOException { - for (BufferedMutator mutator : mutatorMap.values()) { - mutator.close(); - } - if (connection != null) { - connection.close(); - } - } - - /** - * Writes an action (Put or Delete) to the specified table. - * - * @param tableName - * the table being updated. - * @param action - * the update, either a put or a delete. - * @throws IllegalArgumentException - * if the action is not a put or a delete. - */ - @Override - public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException { - BufferedMutator mutator = getBufferedMutator(tableName); - // The actions are not immutable, so we defensively copy them - if (action instanceof Put) { - Put put = new Put((Put) action); - put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL - : Durability.SKIP_WAL); - mutator.mutate(put); - } else if (action instanceof Delete) { - Delete delete = new Delete((Delete) action); - mutator.mutate(delete); - } else - throw new IllegalArgumentException( - "action must be either Delete or Put"); - } - } - - @Override - public void checkOutputSpecs(JobContext context) throws IOException, - InterruptedException { - // we can't know ahead of time if it's going to blow up when the user - // passes a table name that doesn't exist, so nothing useful here. - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException, InterruptedException { - return new TableOutputCommitter(); - } - - @Override - public RecordWriter getRecordWriter(TaskAttemptContext context) - throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - return new MultiTableRecordWriter(HBaseConfiguration.create(conf), - conf.getBoolean(WAL_PROPERTY, WAL_ON)); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java deleted file mode 100644 index 0f07a58..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -/** - * MultiTableSnapshotInputFormat generalizes - * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} - * allowing a MapReduce job to run over one or more table snapshots, with one or more scans - * configured for each. - * Internally, the input format delegates to - * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} - * and thus has the same performance advantages; - * see {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for - * more details. - * Usage is similar to TableSnapshotInputFormat, with the following exception: - * initMultiTableSnapshotMapperJob takes in a map - * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding - * scan will be applied; - * the overall dataset for the job is defined by the concatenation of the regions and tables - * included in each snapshot/scan - * pair. - * {@link org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#initMultiTableSnapshotMapperJob - * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache - * .hadoop.fs.Path)} - * can be used to configure the job. - *
{@code
- * Job job = new Job(conf);
- * Map> snapshotScans = ImmutableMap.of(
- *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
- *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
- * );
- * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
- * TableMapReduceUtil.initTableSnapshotMapperJob(
- *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
- *      MyMapOutputValueWritable.class, job, true, restoreDir);
- * }
- * 
- * Internally, this input format restores each snapshot into a subdirectory of the given tmp - * directory. Input splits and - * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce - * .TableSnapshotInputFormat} - * (one per region). - * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on - * permissioning; the - * same caveats apply here. - * - * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat - * @see org.apache.hadoop.hbase.client.TableSnapshotScanner - */ -@InterfaceAudience.Public -public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat { - - private final MultiTableSnapshotInputFormatImpl delegate; - - public MultiTableSnapshotInputFormat() { - this.delegate = new MultiTableSnapshotInputFormatImpl(); - } - - @Override - public List getSplits(JobContext jobContext) - throws IOException, InterruptedException { - List splits = - delegate.getSplits(jobContext.getConfiguration()); - List rtn = Lists.newArrayListWithCapacity(splits.size()); - - for (TableSnapshotInputFormatImpl.InputSplit split : splits) { - rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split)); - } - - return rtn; - } - - public static void setInput(Configuration configuration, - Map> snapshotScans, Path tmpRestoreDir) throws IOException { - new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java deleted file mode 100644 index 4331c0f..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; -import org.apache.hadoop.hbase.snapshot.SnapshotManifest; -import org.apache.hadoop.hbase.util.ConfigurationUtil; -import org.apache.hadoop.hbase.util.FSUtils; - -import java.io.IOException; -import java.util.AbstractMap; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -/** - * Shared implementation of mapreduce code over multiple table snapshots. - * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce - * .MultiTableSnapshotInputFormat} and mapred - * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations. - */ -@InterfaceAudience.LimitedPrivate({ "HBase" }) -@InterfaceStability.Evolving -public class MultiTableSnapshotInputFormatImpl { - - private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class); - - public static final String RESTORE_DIRS_KEY = - "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping"; - public static final String SNAPSHOT_TO_SCANS_KEY = - "hbase.MultiTableSnapshotInputFormat.snapshotsToScans"; - - /** - * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of - * restoreDir. - * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY} - * - * @param conf - * @param snapshotScans - * @param restoreDir - * @throws IOException - */ - public void setInput(Configuration conf, Map> snapshotScans, - Path restoreDir) throws IOException { - Path rootDir = FSUtils.getRootDir(conf); - FileSystem fs = rootDir.getFileSystem(conf); - - setSnapshotToScans(conf, snapshotScans); - Map restoreDirs = - generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir); - setSnapshotDirs(conf, restoreDirs); - restoreSnapshots(conf, restoreDirs, fs); - } - - /** - * Return the list of splits extracted from the scans/snapshots pushed to conf by - * {@link - * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)} - * - * @param conf Configuration to determine splits from - * @return Return the list of splits extracted from the scans/snapshots pushed to conf - * @throws IOException - */ - public List getSplits(Configuration conf) - throws IOException { - Path rootDir = FSUtils.getRootDir(conf); - FileSystem fs = rootDir.getFileSystem(conf); - - List rtn = Lists.newArrayList(); - - Map> snapshotsToScans = getSnapshotsToScans(conf); - Map snapshotsToRestoreDirs = getSnapshotDirs(conf); - for (Map.Entry> entry : snapshotsToScans.entrySet()) { - String snapshotName = entry.getKey(); - - Path restoreDir = snapshotsToRestoreDirs.get(snapshotName); - - SnapshotManifest manifest = - TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs); - List regionInfos = - TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest); - - for (Scan scan : entry.getValue()) { - List splits = - TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf); - rtn.addAll(splits); - } - } - return rtn; - } - - /** - * Retrieve the snapshot name -> list<scan> mapping pushed to configuration by - * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)} - * - * @param conf Configuration to extract name -> list<scan> mappings from. - * @return the snapshot name -> list<scan> mapping pushed to configuration - * @throws IOException - */ - public Map> getSnapshotsToScans(Configuration conf) throws IOException { - - Map> rtn = Maps.newHashMap(); - - for (Map.Entry entry : ConfigurationUtil - .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) { - String snapshotName = entry.getKey(); - String scan = entry.getValue(); - - Collection snapshotScans = rtn.get(snapshotName); - if (snapshotScans == null) { - snapshotScans = Lists.newArrayList(); - rtn.put(snapshotName, snapshotScans); - } - - snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan)); - } - - return rtn; - } - - /** - * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY}) - * - * @param conf - * @param snapshotScans - * @throws IOException - */ - public void setSnapshotToScans(Configuration conf, Map> snapshotScans) - throws IOException { - // flatten out snapshotScans for serialization to the job conf - List> snapshotToSerializedScans = Lists.newArrayList(); - - for (Map.Entry> entry : snapshotScans.entrySet()) { - String snapshotName = entry.getKey(); - Collection scans = entry.getValue(); - - // serialize all scans and map them to the appropriate snapshot - for (Scan scan : scans) { - snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName, - TableMapReduceUtil.convertScanToString(scan))); - } - } - - ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans); - } - - /** - * Retrieve the directories into which snapshots have been restored from - * ({@link #RESTORE_DIRS_KEY}) - * - * @param conf Configuration to extract restore directories from - * @return the directories into which snapshots have been restored from - * @throws IOException - */ - public Map getSnapshotDirs(Configuration conf) throws IOException { - List> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY); - Map rtn = Maps.newHashMapWithExpectedSize(kvps.size()); - - for (Map.Entry kvp : kvps) { - rtn.put(kvp.getKey(), new Path(kvp.getValue())); - } - - return rtn; - } - - public void setSnapshotDirs(Configuration conf, Map snapshotDirs) { - Map toSet = Maps.newHashMap(); - - for (Map.Entry entry : snapshotDirs.entrySet()) { - toSet.put(entry.getKey(), entry.getValue().toString()); - } - - ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet()); - } - - /** - * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and - * return a map from the snapshot to the restore directory. - * - * @param snapshots collection of snapshot names to restore - * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored - * @return a mapping from snapshot name to the directory in which that snapshot has been restored - */ - private Map generateSnapshotToRestoreDirMapping(Collection snapshots, - Path baseRestoreDir) { - Map rtn = Maps.newHashMap(); - - for (String snapshotName : snapshots) { - Path restoreSnapshotDir = - new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString()); - rtn.put(snapshotName, restoreSnapshotDir); - } - - return rtn; - } - - /** - * Restore each (snapshot name, restore directory) pair in snapshotToDir - * - * @param conf configuration to restore with - * @param snapshotToDir mapping from snapshot names to restore directories - * @param fs filesystem to do snapshot restoration on - * @throws IOException - */ - public void restoreSnapshots(Configuration conf, Map snapshotToDir, FileSystem fs) - throws IOException { - // TODO: restore from record readers to parallelize. - Path rootDir = FSUtils.getRootDir(conf); - - for (Map.Entry entry : snapshotToDir.entrySet()) { - String snapshotName = entry.getKey(); - Path restoreDir = entry.getValue(); - LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir - + " for MultiTableSnapshotInputFormat"); - restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs); - } - } - - void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir, - FileSystem fs) throws IOException { - RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java deleted file mode 100644 index d1dba1d..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java +++ /dev/null @@ -1,301 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.MapContext; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.StatusReporter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.util.ReflectionUtils; - - -/** - * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper - *

- * It can be used instead when the Map operation is not CPU - * bound in order to improve throughput. - *

- * Mapper implementations using this MapRunnable must be thread-safe. - *

- * The Map-Reduce job has to be configured with the mapper to use via - * {@link #setMapperClass} and the number of thread the thread-pool can use with the - * {@link #getNumberOfThreads} method. The default value is 10 threads. - *

- */ - -public class MultithreadedTableMapper extends TableMapper { - private static final Log LOG = LogFactory.getLog(MultithreadedTableMapper.class); - private Class> mapClass; - private Context outer; - private ExecutorService executor; - public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads"; - public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass"; - - /** - * The number of threads in the thread pool that will run the map function. - * @param job the job - * @return the number of threads - */ - public static int getNumberOfThreads(JobContext job) { - return job.getConfiguration(). - getInt(NUMBER_OF_THREADS, 10); - } - - /** - * Set the number of threads in the pool for running maps. - * @param job the job to modify - * @param threads the new number of threads - */ - public static void setNumberOfThreads(Job job, int threads) { - job.getConfiguration().setInt(NUMBER_OF_THREADS, - threads); - } - - /** - * Get the application's mapper class. - * @param the map's output key type - * @param the map's output value type - * @param job the job - * @return the mapper class to run - */ - @SuppressWarnings("unchecked") - public static - Class> getMapperClass(JobContext job) { - return (Class>) - job.getConfiguration().getClass( MAPPER_CLASS, - Mapper.class); - } - - /** - * Set the application's mapper class. - * @param the map output key type - * @param the map output value type - * @param job the job to modify - * @param cls the class to use as the mapper - */ - public static - void setMapperClass(Job job, - Class> cls) { - if (MultithreadedTableMapper.class.isAssignableFrom(cls)) { - throw new IllegalArgumentException("Can't have recursive " + - "MultithreadedTableMapper instances."); - } - job.getConfiguration().setClass(MAPPER_CLASS, - cls, Mapper.class); - } - - /** - * Run the application's maps using a thread pool. - */ - @Override - public void run(Context context) throws IOException, InterruptedException { - outer = context; - int numberOfThreads = getNumberOfThreads(context); - mapClass = getMapperClass(context); - if (LOG.isDebugEnabled()) { - LOG.debug("Configuring multithread runner to use " + numberOfThreads + - " threads"); - } - executor = Executors.newFixedThreadPool(numberOfThreads); - for(int i=0; i < numberOfThreads; ++i) { - MapRunner thread = new MapRunner(context); - executor.execute(thread); - } - executor.shutdown(); - while (!executor.isTerminated()) { - // wait till all the threads are done - Thread.sleep(1000); - } - } - - private class SubMapRecordReader - extends RecordReader { - private ImmutableBytesWritable key; - private Result value; - private Configuration conf; - - @Override - public void close() throws IOException { - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return 0; - } - - @Override - public void initialize(InputSplit split, - TaskAttemptContext context - ) throws IOException, InterruptedException { - conf = context.getConfiguration(); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - synchronized (outer) { - if (!outer.nextKeyValue()) { - return false; - } - key = ReflectionUtils.copy(outer.getConfiguration(), - outer.getCurrentKey(), key); - value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value); - return true; - } - } - - public ImmutableBytesWritable getCurrentKey() { - return key; - } - - @Override - public Result getCurrentValue() { - return value; - } - } - - private class SubMapRecordWriter extends RecordWriter { - - @Override - public void close(TaskAttemptContext context) throws IOException, - InterruptedException { - } - - @Override - public void write(K2 key, V2 value) throws IOException, - InterruptedException { - synchronized (outer) { - outer.write(key, value); - } - } - } - - private class SubMapStatusReporter extends StatusReporter { - - @Override - public Counter getCounter(Enum name) { - return outer.getCounter(name); - } - - @Override - public Counter getCounter(String group, String name) { - return outer.getCounter(group, name); - } - - @Override - public void progress() { - outer.progress(); - } - - @Override - public void setStatus(String status) { - outer.setStatus(status); - } - - public float getProgress() { - return 0; - } - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION", - justification="Don't understand why FB is complaining about this one. We do throw exception") - private class MapRunner implements Runnable { - private Mapper mapper; - private Context subcontext; - - @SuppressWarnings({ "rawtypes", "unchecked" }) - MapRunner(Context context) throws IOException, InterruptedException { - mapper = ReflectionUtils.newInstance(mapClass, - context.getConfiguration()); - try { - Constructor c = context.getClass().getConstructor( - Mapper.class, - Configuration.class, - TaskAttemptID.class, - RecordReader.class, - RecordWriter.class, - OutputCommitter.class, - StatusReporter.class, - InputSplit.class); - c.setAccessible(true); - subcontext = (Context) c.newInstance( - mapper, - outer.getConfiguration(), - outer.getTaskAttemptID(), - new SubMapRecordReader(), - new SubMapRecordWriter(), - context.getOutputCommitter(), - new SubMapStatusReporter(), - outer.getInputSplit()); - } catch (Exception e) { - try { - Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor( - Configuration.class, - TaskAttemptID.class, - RecordReader.class, - RecordWriter.class, - OutputCommitter.class, - StatusReporter.class, - InputSplit.class); - c.setAccessible(true); - MapContext mc = (MapContext) c.newInstance( - outer.getConfiguration(), - outer.getTaskAttemptID(), - new SubMapRecordReader(), - new SubMapRecordWriter(), - context.getOutputCommitter(), - new SubMapStatusReporter(), - outer.getInputSplit()); - Class wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper"); - Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class); - subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc); - } catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION - // rethrow as IOE - throw new IOException(e); - } - } - } - - @Override - public void run() { - try { - mapper.run(subcontext); - } catch (Throwable ie) { - LOG.error("Problem in running map.", ie); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java deleted file mode 100644 index 8997da9..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serialization; -import org.apache.hadoop.io.serializer.Serializer; - -@InterfaceAudience.Public -public class MutationSerialization implements Serialization { - @Override - public boolean accept(Class c) { - return Mutation.class.isAssignableFrom(c); - } - - @Override - public Deserializer getDeserializer(Class c) { - return new MutationDeserializer(); - } - - @Override - public Serializer getSerializer(Class c) { - return new MutationSerializer(); - } - - private static class MutationDeserializer implements Deserializer { - private InputStream in; - - @Override - public void close() throws IOException { - in.close(); - } - - @Override - public Mutation deserialize(Mutation mutation) throws IOException { - MutationProto proto = MutationProto.parseDelimitedFrom(in); - return ProtobufUtil.toMutation(proto); - } - - @Override - public void open(InputStream in) throws IOException { - this.in = in; - } - - } - private static class MutationSerializer implements Serializer { - private OutputStream out; - - @Override - public void close() throws IOException { - out.close(); - } - - @Override - public void open(OutputStream out) throws IOException { - this.out = out; - } - - @Override - public void serialize(Mutation mutation) throws IOException { - MutationType type; - if (mutation instanceof Put) { - type = MutationType.PUT; - } else if (mutation instanceof Delete) { - type = MutationType.DELETE; - } else { - throw new IllegalArgumentException("Only Put and Delete are supported"); - } - ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java deleted file mode 100644 index f01e84f..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.List; -import java.util.Map.Entry; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.mapreduce.Reducer; - -/** - * Combine Puts. Merges Put instances grouped by K into a single - * instance. - * @see TableMapReduceUtil - */ -@InterfaceAudience.Public -public class PutCombiner extends Reducer { - private static final Log LOG = LogFactory.getLog(PutCombiner.class); - - @Override - protected void reduce(K row, Iterable vals, Context context) - throws IOException, InterruptedException { - // Using HeapSize to create an upper bound on the memory size of - // the puts and flush some portion of the content while looping. This - // flush could result in multiple Puts for a single rowkey. That is - // acceptable because Combiner is run as an optimization and it's not - // critical that all Puts are grouped perfectly. - long threshold = context.getConfiguration().getLong( - "putcombiner.row.threshold", 1L * (1<<30)); - int cnt = 0; - long curSize = 0; - Put put = null; - Map> familyMap = null; - for (Put p : vals) { - cnt++; - if (put == null) { - put = p; - familyMap = put.getFamilyCellMap(); - } else { - for (Entry> entry : p.getFamilyCellMap() - .entrySet()) { - List cells = familyMap.get(entry.getKey()); - List kvs = (cells != null) ? (List) cells : null; - for (Cell cell : entry.getValue()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - curSize += kv.heapSize(); - if (kvs != null) { - kvs.add(kv); - } - } - if (cells == null) { - familyMap.put(entry.getKey(), entry.getValue()); - } - } - if (cnt % 10 == 0) context.setStatus("Combine " + cnt); - if (curSize > threshold) { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1)); - } - context.write(row, put); - put = null; - curSize = 0; - cnt = 0; - } - } - } - if (put != null) { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1)); - } - context.write(row, put); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java deleted file mode 100644 index 17ab9cb..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.TreeSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ArrayBackedTag; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.TagType; -import org.apache.hadoop.hbase.TagUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.security.visibility.CellVisibility; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.util.StringUtils; - -/** - * Emits sorted Puts. - * Reads in all Puts from passed Iterator, sorts them, then emits - * Puts in sorted order. If lots of columns per row, it will use lots of - * memory sorting. - * @see HFileOutputFormat2 - * @see KeyValueSortReducer - */ -@InterfaceAudience.Public -public class PutSortReducer extends - Reducer { - // the cell creator - private CellCreator kvCreator; - - @Override - protected void - setup(Reducer.Context context) - throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - this.kvCreator = new CellCreator(conf); - } - - @Override - protected void reduce( - ImmutableBytesWritable row, - java.lang.Iterable puts, - Reducer.Context context) - throws java.io.IOException, InterruptedException - { - // although reduce() is called per-row, handle pathological case - long threshold = context.getConfiguration().getLong( - "putsortreducer.row.threshold", 1L * (1<<30)); - Iterator iter = puts.iterator(); - while (iter.hasNext()) { - TreeSet map = new TreeSet<>(CellComparator.COMPARATOR); - long curSize = 0; - // stop at the end or the RAM threshold - List tags = new ArrayList<>(); - while (iter.hasNext() && curSize < threshold) { - // clear the tags - tags.clear(); - Put p = iter.next(); - long t = p.getTTL(); - if (t != Long.MAX_VALUE) { - // add TTL tag if found - tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t))); - } - byte[] acl = p.getACL(); - if (acl != null) { - // add ACL tag if found - tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl)); - } - try { - CellVisibility cellVisibility = p.getCellVisibility(); - if (cellVisibility != null) { - // add the visibility labels if any - tags.addAll(kvCreator.getVisibilityExpressionResolver() - .createVisibilityExpTags(cellVisibility.getExpression())); - } - } catch (DeserializationException e) { - // We just throw exception here. Should we allow other mutations to proceed by - // just ignoring the bad one? - throw new IOException("Invalid visibility expression found in mutation " + p, e); - } - for (List cells: p.getFamilyCellMap().values()) { - for (Cell cell: cells) { - // Creating the KV which needs to be directly written to HFiles. Using the Facade - // KVCreator for creation of kvs. - KeyValue kv = null; - TagUtil.carryForwardTags(tags, cell); - if (!tags.isEmpty()) { - kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(), - cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), - cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(), - cell.getValueOffset(), cell.getValueLength(), tags); - } else { - kv = KeyValueUtil.ensureKeyValue(cell); - } - if (map.add(kv)) {// don't count duplicated kv into size - curSize += kv.heapSize(); - } - } - } - } - context.setStatus("Read " + map.size() + " entries of " + map.getClass() - + "(" + StringUtils.humanReadableInt(curSize) + ")"); - int index = 0; - for (KeyValue kv : map) { - context.write(row, kv); - if (++index % 100 == 0) - context.setStatus("Wrote " + index); - } - - // if we have more entries to process - if (iter.hasNext()) { - // force flush because we cannot guarantee intra-row sorted order - context.write(null, null); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java deleted file mode 100644 index dff04b6..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serialization; -import org.apache.hadoop.io.serializer.Serializer; - -@InterfaceAudience.Public -public class ResultSerialization extends Configured implements Serialization { - private static final Log LOG = LogFactory.getLog(ResultSerialization.class); - // The following configuration property indicates import file format version. - public static final String IMPORT_FORMAT_VER = "hbase.import.version"; - - @Override - public boolean accept(Class c) { - return Result.class.isAssignableFrom(c); - } - - @Override - public Deserializer getDeserializer(Class c) { - // check input format version - Configuration conf = getConf(); - if (conf != null) { - String inputVersion = conf.get(IMPORT_FORMAT_VER); - if (inputVersion != null && inputVersion.equals("0.94")) { - LOG.info("Load exported file using deserializer for HBase 0.94 format"); - return new Result94Deserializer(); - } - } - - return new ResultDeserializer(); - } - - @Override - public Serializer getSerializer(Class c) { - return new ResultSerializer(); - } - - /** - * The following deserializer class is used to load exported file of 0.94 - */ - private static class Result94Deserializer implements Deserializer { - private DataInputStream in; - - @Override - public void close() throws IOException { - in.close(); - } - - @Override - public Result deserialize(Result mutation) throws IOException { - int totalBuffer = in.readInt(); - if (totalBuffer == 0) { - return Result.EMPTY_RESULT; - } - byte[] buf = new byte[totalBuffer]; - readChunked(in, buf, 0, totalBuffer); - List kvs = new ArrayList<>(); - int offset = 0; - while (offset < totalBuffer) { - int keyLength = Bytes.toInt(buf, offset); - offset += Bytes.SIZEOF_INT; - kvs.add(new KeyValue(buf, offset, keyLength)); - offset += keyLength; - } - return Result.create(kvs); - } - - @Override - public void open(InputStream in) throws IOException { - if (!(in instanceof DataInputStream)) { - throw new IOException("Wrong input stream instance passed in"); - } - this.in = (DataInputStream) in; - } - - private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException { - int maxRead = 8192; - - for (; ofs < len; ofs += maxRead) - in.readFully(dest, ofs, Math.min(len - ofs, maxRead)); - } - } - - private static class ResultDeserializer implements Deserializer { - private InputStream in; - - @Override - public void close() throws IOException { - in.close(); - } - - @Override - public Result deserialize(Result mutation) throws IOException { - ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in); - return ProtobufUtil.toResult(proto); - } - - @Override - public void open(InputStream in) throws IOException { - this.in = in; - } - } - - private static class ResultSerializer implements Serializer { - private OutputStream out; - - @Override - public void close() throws IOException { - out.close(); - } - - @Override - public void open(OutputStream out) throws IOException { - this.out = out; - } - - @Override - public void serialize(Result result) throws IOException { - ProtobufUtil.toResult(result).writeDelimitedTo(out); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java deleted file mode 100644 index 2e0591e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.List; -import java.util.ArrayList; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; -import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * A job with a just a map phase to count rows. Map outputs table rows IF the - * input row has columns that have content. - */ -@InterfaceAudience.Public -public class RowCounter extends Configured implements Tool { - - private static final Log LOG = LogFactory.getLog(RowCounter.class); - - /** Name of this 'program'. */ - static final String NAME = "rowcounter"; - - private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count"; - - /** - * Mapper that runs the count. - */ - static class RowCounterMapper - extends TableMapper { - - /** Counter enumeration to count the actual rows. */ - public static enum Counters {ROWS} - - /** - * Maps the data. - * - * @param row The current table row key. - * @param values The columns. - * @param context The current context. - * @throws IOException When something is broken with the data. - * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context) - */ - @Override - public void map(ImmutableBytesWritable row, Result values, - Context context) - throws IOException { - // Count every row containing data, whether it's in qualifiers or values - context.getCounter(Counters.ROWS).increment(1); - } - } - - /** - * Sets up the actual job. - * - * @param conf The current configuration. - * @param args The command line parameters. - * @return The newly created job. - * @throws IOException When setting up the job fails. - */ - public static Job createSubmittableJob(Configuration conf, String[] args) - throws IOException { - String tableName = args[0]; - List rowRangeList = null; - long startTime = 0; - long endTime = 0; - - StringBuilder sb = new StringBuilder(); - - final String rangeSwitch = "--range="; - final String startTimeArgKey = "--starttime="; - final String endTimeArgKey = "--endtime="; - final String expectedCountArg = "--expected-count="; - - // First argument is table name, starting from second - for (int i = 1; i < args.length; i++) { - if (args[i].startsWith(rangeSwitch)) { - try { - rowRangeList = parseRowRangeParameter(args[i], rangeSwitch); - } catch (IllegalArgumentException e) { - return null; - } - continue; - } - if (args[i].startsWith(startTimeArgKey)) { - startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); - continue; - } - if (args[i].startsWith(endTimeArgKey)) { - endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); - continue; - } - if (args[i].startsWith(expectedCountArg)) { - conf.setLong(EXPECTED_COUNT_KEY, - Long.parseLong(args[i].substring(expectedCountArg.length()))); - continue; - } - // if no switch, assume column names - sb.append(args[i]); - sb.append(" "); - } - if (endTime < startTime) { - printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); - return null; - } - - Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); - job.setJarByClass(RowCounter.class); - Scan scan = new Scan(); - scan.setCacheBlocks(false); - setScanFilter(scan, rowRangeList); - if (sb.length() > 0) { - for (String columnName : sb.toString().trim().split(" ")) { - String family = StringUtils.substringBefore(columnName, ":"); - String qualifier = StringUtils.substringAfter(columnName, ":"); - - if (StringUtils.isBlank(qualifier)) { - scan.addFamily(Bytes.toBytes(family)); - } - else { - scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); - } - } - } - scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); - job.setOutputFormatClass(NullOutputFormat.class); - TableMapReduceUtil.initTableMapperJob(tableName, scan, - RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); - job.setNumReduceTasks(0); - return job; - } - - private static List parseRowRangeParameter( - String arg, String rangeSwitch) { - final String[] ranges = arg.substring(rangeSwitch.length()).split(";"); - final List rangeList = new ArrayList<>(); - for (String range : ranges) { - String[] startEnd = range.split(",", 2); - if (startEnd.length != 2 || startEnd[1].contains(",")) { - printUsage("Please specify range in such format as \"--range=a,b\" " + - "or, with only one boundary, \"--range=,b\" or \"--range=a,\""); - throw new IllegalArgumentException("Wrong range specification: " + range); - } - String startKey = startEnd[0]; - String endKey = startEnd[1]; - rangeList.add(new MultiRowRangeFilter.RowRange( - Bytes.toBytesBinary(startKey), true, - Bytes.toBytesBinary(endKey), false)); - } - return rangeList; - } - - /** - * Sets filter {@link FilterBase} to the {@link Scan} instance. - * If provided rowRangeList contains more than one element, - * method sets filter which is instance of {@link MultiRowRangeFilter}. - * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. - * If rowRangeList contains exactly one element, startRow and stopRow are set to the scan. - * @param scan - * @param rowRangeList - */ - private static void setScanFilter(Scan scan, List rowRangeList) { - final int size = rowRangeList == null ? 0 : rowRangeList.size(); - if (size <= 1) { - scan.setFilter(new FirstKeyOnlyFilter()); - } - if (size == 1) { - MultiRowRangeFilter.RowRange range = rowRangeList.get(0); - scan.setStartRow(range.getStartRow()); //inclusive - scan.setStopRow(range.getStopRow()); //exclusive - } else if (size > 1) { - scan.setFilter(new MultiRowRangeFilter(rowRangeList)); - } - } - - /* - * @param errorMessage Can attach a message when error occurs. - */ - private static void printUsage(String errorMessage) { - System.err.println("ERROR: " + errorMessage); - printUsage(); - } - - /** - * Prints usage without error message. - * Note that we don't document --expected-count, because it's intended for test. - */ - private static void printUsage() { - System.err.println("Usage: RowCounter [options] " + - "[--starttime=[start] --endtime=[end] " + - "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [ ...]"); - System.err.println("For performance consider the following options:\n" - + "-Dhbase.client.scanner.caching=100\n" - + "-Dmapreduce.map.speculative=false"); - } - - @Override - public int run(String[] args) throws Exception { - if (args.length < 1) { - printUsage("Wrong number of parameters: " + args.length); - return -1; - } - Job job = createSubmittableJob(getConf(), args); - if (job == null) { - return -1; - } - boolean success = job.waitForCompletion(true); - final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1); - if (success && expectedCount != -1) { - final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS); - success = expectedCount == counter.getValue(); - if (!success) { - LOG.error("Failing job because count of '" + counter.getValue() + - "' does not match expected count of '" + expectedCount + "'"); - } - } - return (success ? 0 : 1); - } - - /** - * Main entry point. - * @param args The command line parameters. - * @throws Exception When running the job fails. - */ - public static void main(String[] args) throws Exception { - int errCode = ToolRunner.run(HBaseConfiguration.create(), new RowCounter(), args); - System.exit(errCode); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java deleted file mode 100644 index 4ba1088..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Base64; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Partitioner; - -/** - * A partitioner that takes start and end keys and uses bigdecimal to figure - * which reduce a key belongs to. Pass the start and end - * keys in the Configuration using hbase.simpletotalorder.start - * and hbase.simpletotalorder.end. The end key needs to be - * exclusive; i.e. one larger than the biggest key in your key space. - * You may be surprised at how this class partitions the space; it may not - * align with preconceptions; e.g. a start key of zero and an end key of 100 - * divided in ten will not make regions whose range is 0-10, 10-20, and so on. - * Make your own partitioner if you need the region spacing to come out a - * particular way. - * @param - * @see #START - * @see #END - */ -@InterfaceAudience.Public -public class SimpleTotalOrderPartitioner extends Partitioner -implements Configurable { - private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class); - - @Deprecated - public static final String START = "hbase.simpletotalorder.start"; - @Deprecated - public static final String END = "hbase.simpletotalorder.end"; - - static final String START_BASE64 = "hbase.simpletotalorder.start.base64"; - static final String END_BASE64 = "hbase.simpletotalorder.end.base64"; - - private Configuration c; - private byte [] startkey; - private byte [] endkey; - private byte [][] splits; - private int lastReduces = -1; - - public static void setStartKey(Configuration conf, byte[] startKey) { - conf.set(START_BASE64, Base64.encodeBytes(startKey)); - } - - public static void setEndKey(Configuration conf, byte[] endKey) { - conf.set(END_BASE64, Base64.encodeBytes(endKey)); - } - - @SuppressWarnings("deprecation") - static byte[] getStartKey(Configuration conf) { - return getKeyFromConf(conf, START_BASE64, START); - } - - @SuppressWarnings("deprecation") - static byte[] getEndKey(Configuration conf) { - return getKeyFromConf(conf, END_BASE64, END); - } - - private static byte[] getKeyFromConf(Configuration conf, - String base64Key, String deprecatedKey) { - String encoded = conf.get(base64Key); - if (encoded != null) { - return Base64.decode(encoded); - } - String oldStyleVal = conf.get(deprecatedKey); - if (oldStyleVal == null) { - return null; - } - LOG.warn("Using deprecated configuration " + deprecatedKey + - " - please use static accessor methods instead."); - return Bytes.toBytesBinary(oldStyleVal); - } - - @Override - public int getPartition(final ImmutableBytesWritable key, final VALUE value, - final int reduces) { - if (reduces == 1) return 0; - if (this.lastReduces != reduces) { - this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1); - for (int i = 0; i < splits.length; i++) { - LOG.info(Bytes.toStringBinary(splits[i])); - } - this.lastReduces = reduces; - } - int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(), - key.getLength()); - // Below code is from hfile index search. - if (pos < 0) { - pos++; - pos *= -1; - if (pos == 0) { - // falls before the beginning of the file. - throw new RuntimeException("Key outside start/stop range: " + - key.toString()); - } - pos--; - } - return pos; - } - - @Override - public Configuration getConf() { - return this.c; - } - - @Override - public void setConf(Configuration conf) { - this.c = conf; - this.startkey = getStartKey(conf); - this.endkey = getEndKey(conf); - if (startkey == null || endkey == null) { - throw new RuntimeException(this.getClass() + " not configured"); - } - LOG.info("startkey=" + Bytes.toStringBinary(startkey) + - ", endkey=" + Bytes.toStringBinary(endkey)); - // Reset last reduces count on change of Start / End key - this.lastReduces = -1; - } -}