Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 22AD7200D00 for ; Sun, 27 Aug 2017 07:33:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2157A164640; Sun, 27 Aug 2017 05:33:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2189B164632 for ; Sun, 27 Aug 2017 07:33:11 +0200 (CEST) Received: (qmail 95772 invoked by uid 500); 27 Aug 2017 05:33:09 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 91002 invoked by uid 99); 27 Aug 2017 05:33:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 27 Aug 2017 05:33:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 46E5AF5F24; Sun, 27 Aug 2017 05:33:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: busbey@apache.org To: commits@hbase.apache.org Date: Sun, 27 Aug 2017 05:33:39 -0000 Message-Id: In-Reply-To: <43bcd4adff8f4480ab4477052fcf9551@git.apache.org> References: <43bcd4adff8f4480ab4477052fcf9551@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [39/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module. archived-at: Sun, 27 Aug 2017 05:33:15 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java new file mode 100644 index 0000000..403051f --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -0,0 +1,410 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClientSideRegionScanner; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.io.Writable; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Hadoop MR API-agnostic implementation for mapreduce over table snapshots. + */ +@InterfaceAudience.Private +public class TableSnapshotInputFormatImpl { + // TODO: Snapshots files are owned in fs by the hbase user. There is no + // easy way to delegate access. + + public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class); + + private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; + // key for specifying the root dir of the restored snapshot + protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; + + /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */ + private static final String LOCALITY_CUTOFF_MULTIPLIER = + "hbase.tablesnapshotinputformat.locality.cutoff.multiplier"; + private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f; + + /** + * Implementation class for InputSplit logic common between mapred and mapreduce. + */ + public static class InputSplit implements Writable { + + private TableDescriptor htd; + private HRegionInfo regionInfo; + private String[] locations; + private String scan; + private String restoreDir; + + // constructor for mapreduce framework / Writable + public InputSplit() {} + + public InputSplit(TableDescriptor htd, HRegionInfo regionInfo, List locations, + Scan scan, Path restoreDir) { + this.htd = htd; + this.regionInfo = regionInfo; + if (locations == null || locations.isEmpty()) { + this.locations = new String[0]; + } else { + this.locations = locations.toArray(new String[locations.size()]); + } + try { + this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : ""; + } catch (IOException e) { + LOG.warn("Failed to convert Scan to String", e); + } + + this.restoreDir = restoreDir.toString(); + } + + public TableDescriptor getHtd() { + return htd; + } + + public String getScan() { + return scan; + } + + public String getRestoreDir() { + return restoreDir; + } + + public long getLength() { + //TODO: We can obtain the file sizes of the snapshot here. + return 0; + } + + public String[] getLocations() { + return locations; + } + + public TableDescriptor getTableDescriptor() { + return htd; + } + + public HRegionInfo getRegionInfo() { + return regionInfo; + } + + // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of + // doing this wrapping with Writables. + @Override + public void write(DataOutput out) throws IOException { + TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder() + .setTable(ProtobufUtil.toTableSchema(htd)) + .setRegion(HRegionInfo.convert(regionInfo)); + + for (String location : locations) { + builder.addLocations(location); + } + + TableSnapshotRegionSplit split = builder.build(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + split.writeTo(baos); + baos.close(); + byte[] buf = baos.toByteArray(); + out.writeInt(buf.length); + out.write(buf); + + Bytes.writeByteArray(out, Bytes.toBytes(scan)); + Bytes.writeByteArray(out, Bytes.toBytes(restoreDir)); + + } + + @Override + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + byte[] buf = new byte[len]; + in.readFully(buf); + TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf); + this.htd = ProtobufUtil.toTableDescriptor(split.getTable()); + this.regionInfo = HRegionInfo.convert(split.getRegion()); + List locationsList = split.getLocationsList(); + this.locations = locationsList.toArray(new String[locationsList.size()]); + + this.scan = Bytes.toString(Bytes.readByteArray(in)); + this.restoreDir = Bytes.toString(Bytes.readByteArray(in)); + } + } + + /** + * Implementation class for RecordReader logic common between mapred and mapreduce. + */ + public static class RecordReader { + private InputSplit split; + private Scan scan; + private Result result = null; + private ImmutableBytesWritable row = null; + private ClientSideRegionScanner scanner; + + public ClientSideRegionScanner getScanner() { + return scanner; + } + + public void initialize(InputSplit split, Configuration conf) throws IOException { + this.scan = TableMapReduceUtil.convertStringToScan(split.getScan()); + this.split = split; + TableDescriptor htd = split.htd; + HRegionInfo hri = this.split.getRegionInfo(); + FileSystem fs = FSUtils.getCurrentFileSystem(conf); + + + // region is immutable, this should be fine, + // otherwise we have to set the thread read point + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + // disable caching of data blocks + scan.setCacheBlocks(false); + + scanner = + new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null); + } + + public boolean nextKeyValue() throws IOException { + result = scanner.next(); + if (result == null) { + //we are done + return false; + } + + if (this.row == null) { + this.row = new ImmutableBytesWritable(); + } + this.row.set(result.getRow()); + return true; + } + + public ImmutableBytesWritable getCurrentKey() { + return row; + } + + public Result getCurrentValue() { + return result; + } + + public long getPos() { + return 0; + } + + public float getProgress() { + return 0; // TODO: use total bytes to estimate + } + + public void close() { + if (this.scanner != null) { + this.scanner.close(); + } + } + } + + public static List getSplits(Configuration conf) throws IOException { + String snapshotName = getSnapshotName(conf); + + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs); + + List regionInfos = getRegionInfosFromManifest(manifest); + + // TODO: mapred does not support scan as input API. Work around for now. + Scan scan = extractScanFromConf(conf); + // the temp dir where the snapshot is restored + Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); + + return getSplits(scan, manifest, regionInfos, restoreDir, conf); + } + + public static List getRegionInfosFromManifest(SnapshotManifest manifest) { + List regionManifests = manifest.getRegionManifests(); + if (regionManifests == null) { + throw new IllegalArgumentException("Snapshot seems empty"); + } + + List regionInfos = Lists.newArrayListWithCapacity(regionManifests.size()); + + for (SnapshotRegionManifest regionManifest : regionManifests) { + HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo()); + if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { + continue; + } + regionInfos.add(hri); + } + return regionInfos; + } + + public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName, + Path rootDir, FileSystem fs) throws IOException { + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); + } + + public static Scan extractScanFromConf(Configuration conf) throws IOException { + Scan scan = null; + if (conf.get(TableInputFormat.SCAN) != null) { + scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); + } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) { + String[] columns = + conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" "); + scan = new Scan(); + for (String col : columns) { + scan.addFamily(Bytes.toBytes(col)); + } + } else { + throw new IllegalArgumentException("Unable to create scan"); + } + return scan; + } + + public static List getSplits(Scan scan, SnapshotManifest manifest, + List regionManifests, Path restoreDir, Configuration conf) throws IOException { + // load table descriptor + TableDescriptor htd = manifest.getTableDescriptor(); + + Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); + + List splits = new ArrayList<>(); + for (HRegionInfo hri : regionManifests) { + // load region descriptor + + if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), + hri.getEndKey())) { + // compute HDFS locations from snapshot files (which will get the locations for + // referred hfiles) + List hosts = getBestLocations(conf, + HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + + int len = Math.min(3, hosts.size()); + hosts = hosts.subList(0, len); + splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); + } + } + + return splits; + + } + + /** + * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take + * weights into account, thus will treat every location passed from the input split as equal. We + * do not want to blindly pass all the locations, since we are creating one split per region, and + * the region's blocks are all distributed throughout the cluster unless favorite node assignment + * is used. On the expected stable case, only one location will contain most of the blocks as + * local. + * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here + * we are doing a simple heuristic, where we will pass all hosts which have at least 80% + * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top + * host with the best locality. + */ + public static List getBestLocations( + Configuration conf, HDFSBlocksDistribution blockDistribution) { + List locations = new ArrayList<>(3); + + HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); + + if (hostAndWeights.length == 0) { + return locations; + } + + HostAndWeight topHost = hostAndWeights[0]; + locations.add(topHost.getHost()); + + // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality + double cutoffMultiplier + = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); + + double filterWeight = topHost.getWeight() * cutoffMultiplier; + + for (int i = 1; i < hostAndWeights.length; i++) { + if (hostAndWeights[i].getWeight() >= filterWeight) { + locations.add(hostAndWeights[i].getHost()); + } else { + break; + } + } + + return locations; + } + + private static String getSnapshotName(Configuration conf) { + String snapshotName = conf.get(SNAPSHOT_NAME_KEY); + if (snapshotName == null) { + throw new IllegalArgumentException("Snapshot name must be provided"); + } + return snapshotName; + } + + /** + * Configures the job to use TableSnapshotInputFormat to read from a snapshot. + * @param conf the job to configuration + * @param snapshotName the name of the snapshot to read from + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @throws IOException if an error occurs + */ + public static void setInput(Configuration conf, String snapshotName, Path restoreDir) + throws IOException { + conf.set(SNAPSHOT_NAME_KEY, snapshotName); + + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + restoreDir = new Path(restoreDir, UUID.randomUUID().toString()); + + // TODO: restore from record readers to parallelize. + RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + + conf.set(RESTORE_DIR_KEY, restoreDir.toString()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java new file mode 100644 index 0000000..13c7c67 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java @@ -0,0 +1,395 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * A table split corresponds to a key range (low, high) and an optional scanner. + * All references to row below refer to the key of the row. + */ +@InterfaceAudience.Public +public class TableSplit extends InputSplit +implements Writable, Comparable { + /** @deprecated LOG variable would be made private. fix in hbase 3.0 */ + @Deprecated + public static final Log LOG = LogFactory.getLog(TableSplit.class); + + // should be < 0 (@see #readFields(DataInput)) + // version 1 supports Scan data member + enum Version { + UNVERSIONED(0), + // Initial number we put on TableSplit when we introduced versioning. + INITIAL(-1), + // Added an encoded region name field for easier identification of split -> region + WITH_ENCODED_REGION_NAME(-2); + + final int code; + static final Version[] byCode; + static { + byCode = Version.values(); + for (int i = 0; i < byCode.length; i++) { + if (byCode[i].code != -1 * i) { + throw new AssertionError("Values in this enum should be descending by one"); + } + } + } + + Version(int code) { + this.code = code; + } + + boolean atLeast(Version other) { + return code <= other.code; + } + + static Version fromCode(int code) { + return byCode[code * -1]; + } + } + + private static final Version VERSION = Version.WITH_ENCODED_REGION_NAME; + private TableName tableName; + private byte [] startRow; + private byte [] endRow; + private String regionLocation; + private String encodedRegionName = ""; + private String scan = ""; // stores the serialized form of the Scan + private long length; // Contains estimation of region size in bytes + + /** Default constructor. */ + public TableSplit() { + this((TableName)null, null, HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, ""); + } + + /** + * Creates a new instance while assigning all variables. + * Length of region is set to 0 + * Encoded name of the region is set to blank + * + * @param tableName The name of the current table. + * @param scan The scan associated with this split. + * @param startRow The start row of the split. + * @param endRow The end row of the split. + * @param location The location of the region. + */ + public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow, + final String location) { + this(tableName, scan, startRow, endRow, location, 0L); + } + + /** + * Creates a new instance while assigning all variables. + * Encoded name of region is set to blank + * + * @param tableName The name of the current table. + * @param scan The scan associated with this split. + * @param startRow The start row of the split. + * @param endRow The end row of the split. + * @param location The location of the region. + */ + public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow, + final String location, long length) { + this(tableName, scan, startRow, endRow, location, "", length); + } + + /** + * Creates a new instance while assigning all variables. + * + * @param tableName The name of the current table. + * @param scan The scan associated with this split. + * @param startRow The start row of the split. + * @param endRow The end row of the split. + * @param encodedRegionName The region ID. + * @param location The location of the region. + */ + public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow, + final String location, final String encodedRegionName, long length) { + this.tableName = tableName; + try { + this.scan = + (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan); + } catch (IOException e) { + LOG.warn("Failed to convert Scan to String", e); + } + this.startRow = startRow; + this.endRow = endRow; + this.regionLocation = location; + this.encodedRegionName = encodedRegionName; + this.length = length; + } + + /** + * Creates a new instance without a scanner. + * Length of region is set to 0 + * + * @param tableName The name of the current table. + * @param startRow The start row of the split. + * @param endRow The end row of the split. + * @param location The location of the region. + */ + public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, + final String location) { + this(tableName, null, startRow, endRow, location); + } + + /** + * Creates a new instance without a scanner. + * + * @param tableName The name of the current table. + * @param startRow The start row of the split. + * @param endRow The end row of the split. + * @param location The location of the region. + * @param length Size of region in bytes + */ + public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, + final String location, long length) { + this(tableName, null, startRow, endRow, location, length); + } + + /** + * Returns a Scan object from the stored string representation. + * + * @return Returns a Scan object based on the stored scanner. + * @throws IOException + */ + public Scan getScan() throws IOException { + return TableMapReduceUtil.convertStringToScan(this.scan); + } + + /** + * Returns the table name converted to a byte array. + * @see #getTable() + * @return The table name. + */ + public byte [] getTableName() { + return tableName.getName(); + } + + /** + * Returns the table name. + * + * @return The table name. + */ + public TableName getTable() { + // It is ugly that usually to get a TableName, the method is called getTableName. We can't do + // that in here though because there was an existing getTableName in place already since + // deprecated. + return tableName; + } + + /** + * Returns the start row. + * + * @return The start row. + */ + public byte [] getStartRow() { + return startRow; + } + + /** + * Returns the end row. + * + * @return The end row. + */ + public byte [] getEndRow() { + return endRow; + } + + /** + * Returns the region location. + * + * @return The region's location. + */ + public String getRegionLocation() { + return regionLocation; + } + + /** + * Returns the region's location as an array. + * + * @return The array containing the region location. + * @see org.apache.hadoop.mapreduce.InputSplit#getLocations() + */ + @Override + public String[] getLocations() { + return new String[] {regionLocation}; + } + + /** + * Returns the region's encoded name. + * + * @return The region's encoded name. + */ + public String getEncodedRegionName() { + return encodedRegionName; + } + + /** + * Returns the length of the split. + * + * @return The length of the split. + * @see org.apache.hadoop.mapreduce.InputSplit#getLength() + */ + @Override + public long getLength() { + return length; + } + + /** + * Reads the values of each field. + * + * @param in The input to read from. + * @throws IOException When reading the input fails. + */ + @Override + public void readFields(DataInput in) throws IOException { + Version version = Version.UNVERSIONED; + // TableSplit was not versioned in the beginning. + // In order to introduce it now, we make use of the fact + // that tableName was written with Bytes.writeByteArray, + // which encodes the array length as a vint which is >= 0. + // Hence if the vint is >= 0 we have an old version and the vint + // encodes the length of tableName. + // If < 0 we just read the version and the next vint is the length. + // @see Bytes#readByteArray(DataInput) + int len = WritableUtils.readVInt(in); + if (len < 0) { + // what we just read was the version + version = Version.fromCode(len); + len = WritableUtils.readVInt(in); + } + byte[] tableNameBytes = new byte[len]; + in.readFully(tableNameBytes); + tableName = TableName.valueOf(tableNameBytes); + startRow = Bytes.readByteArray(in); + endRow = Bytes.readByteArray(in); + regionLocation = Bytes.toString(Bytes.readByteArray(in)); + if (version.atLeast(Version.INITIAL)) { + scan = Bytes.toString(Bytes.readByteArray(in)); + } + length = WritableUtils.readVLong(in); + if (version.atLeast(Version.WITH_ENCODED_REGION_NAME)) { + encodedRegionName = Bytes.toString(Bytes.readByteArray(in)); + } + } + + /** + * Writes the field values to the output. + * + * @param out The output to write to. + * @throws IOException When writing the values to the output fails. + */ + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, VERSION.code); + Bytes.writeByteArray(out, tableName.getName()); + Bytes.writeByteArray(out, startRow); + Bytes.writeByteArray(out, endRow); + Bytes.writeByteArray(out, Bytes.toBytes(regionLocation)); + Bytes.writeByteArray(out, Bytes.toBytes(scan)); + WritableUtils.writeVLong(out, length); + Bytes.writeByteArray(out, Bytes.toBytes(encodedRegionName)); + } + + /** + * Returns the details about this instance as a string. + * + * @return The values of this instance as a string. + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("HBase table split("); + sb.append("table name: ").append(tableName); + // null scan input is represented by "" + String printScan = ""; + if (!scan.equals("")) { + try { + // get the real scan here in toString, not the Base64 string + printScan = TableMapReduceUtil.convertStringToScan(scan).toString(); + } + catch (IOException e) { + printScan = ""; + } + } + sb.append(", scan: ").append(printScan); + sb.append(", start row: ").append(Bytes.toStringBinary(startRow)); + sb.append(", end row: ").append(Bytes.toStringBinary(endRow)); + sb.append(", region location: ").append(regionLocation); + sb.append(", encoded region name: ").append(encodedRegionName); + sb.append(")"); + return sb.toString(); + } + + /** + * Compares this split against the given one. + * + * @param split The split to compare to. + * @return The result of the comparison. + * @see java.lang.Comparable#compareTo(java.lang.Object) + */ + @Override + public int compareTo(TableSplit split) { + // If The table name of the two splits is the same then compare start row + // otherwise compare based on table names + int tableNameComparison = + getTable().compareTo(split.getTable()); + return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo( + getStartRow(), split.getStartRow()); + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof TableSplit)) { + return false; + } + return tableName.equals(((TableSplit)o).tableName) && + Bytes.equals(startRow, ((TableSplit)o).startRow) && + Bytes.equals(endRow, ((TableSplit)o).endRow) && + regionLocation.equals(((TableSplit)o).regionLocation); + } + + @Override + public int hashCode() { + int result = tableName != null ? tableName.hashCode() : 0; + result = 31 * result + (scan != null ? scan.hashCode() : 0); + result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0); + result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0); + result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0); + result = 31 * result + (encodedRegionName != null ? encodedRegionName.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java new file mode 100644 index 0000000..30cd461 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ArrayBackedTag; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.security.visibility.InvalidLabelException; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.StringUtils; + +/** + * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit. + * @see HFileOutputFormat2 + * @see KeyValueSortReducer + * @see PutSortReducer + */ +@InterfaceAudience.Public +public class TextSortReducer extends + Reducer { + + /** Timestamp for all inserted rows */ + private long ts; + + /** Column seperator */ + private String separator; + + /** Should skip bad lines */ + private boolean skipBadLines; + + private Counter badLineCount; + + private ImportTsv.TsvParser parser; + + /** Cell visibility expr **/ + private String cellVisibilityExpr; + + /** Cell TTL */ + private long ttl; + + private CellCreator kvCreator; + + public long getTs() { + return ts; + } + + public boolean getSkipBadLines() { + return skipBadLines; + } + + public Counter getBadLineCount() { + return badLineCount; + } + + public void incrementBadLineCount(int count) { + this.badLineCount.increment(count); + } + + /** + * Handles initializing this class with objects specific to it (i.e., the parser). + * Common initialization that might be leveraged by a subsclass is done in + * doSetup. Hence a subclass may choose to override this method + * and call doSetup as well before handling it's own custom params. + * + * @param context + */ + @Override + protected void setup(Context context) { + Configuration conf = context.getConfiguration(); + doSetup(context, conf); + + parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); + if (parser.getRowKeyColumnIndex() == -1) { + throw new RuntimeException("No row key column specified"); + } + this.kvCreator = new CellCreator(conf); + } + + /** + * Handles common parameter initialization that a subclass might want to leverage. + * @param context + * @param conf + */ + protected void doSetup(Context context, Configuration conf) { + // If a custom separator has been used, + // decode it back from Base64 encoding. + separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); + if (separator == null) { + separator = ImportTsv.DEFAULT_SEPARATOR; + } else { + separator = new String(Base64.decode(separator)); + } + + // Should never get 0 as we are setting this to a valid value in job configuration. + ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); + + skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); + badLineCount = context.getCounter("ImportTsv", "Bad Lines"); + } + + @Override + protected void reduce( + ImmutableBytesWritable rowKey, + java.lang.Iterable lines, + Reducer.Context context) + throws java.io.IOException, InterruptedException + { + // although reduce() is called per-row, handle pathological case + long threshold = context.getConfiguration().getLong( + "reducer.row.threshold", 1L * (1<<30)); + Iterator iter = lines.iterator(); + while (iter.hasNext()) { + Set kvs = new TreeSet<>(CellComparator.COMPARATOR); + long curSize = 0; + // stop at the end or the RAM threshold + while (iter.hasNext() && curSize < threshold) { + Text line = iter.next(); + byte[] lineBytes = line.getBytes(); + try { + ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength()); + // Retrieve timestamp if exists + ts = parsed.getTimestamp(ts); + cellVisibilityExpr = parsed.getCellVisibility(); + ttl = parsed.getCellTTL(); + + // create tags for the parsed line + List tags = new ArrayList<>(); + if (cellVisibilityExpr != null) { + tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags( + cellVisibilityExpr)); + } + // Add TTL directly to the KV so we can vary them when packing more than one KV + // into puts + if (ttl > 0) { + tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + } + for (int i = 0; i < parsed.getColumnCount(); i++) { + if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() + || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex() + || i == parser.getCellTTLColumnIndex()) { + continue; + } + // Creating the KV which needs to be directly written to HFiles. Using the Facade + // KVCreator for creation of kvs. + Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), + parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length, + parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes, + parsed.getColumnOffset(i), parsed.getColumnLength(i), tags); + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + kvs.add(kv); + curSize += kv.heapSize(); + } + } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException + | InvalidLabelException badLine) { + if (skipBadLines) { + System.err.println("Bad line." + badLine.getMessage()); + incrementBadLineCount(1); + continue; + } + throw new IOException(badLine); + } + } + context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass() + + "(" + StringUtils.humanReadableInt(curSize) + ")"); + int index = 0; + for (KeyValue kv : kvs) { + context.write(rowKey, kv); + if (++index > 0 && index % 100 == 0) + context.setStatus("Wrote " + index + " key values."); + } + + // if we have more entries to process + if (iter.hasNext()) { + // force flush because we cannot guarantee intra-row sorted order + context.write(null, null); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java new file mode 100644 index 0000000..3c507b3 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ArrayBackedTag; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; +import org.apache.hadoop.hbase.security.visibility.CellVisibility; +import org.apache.hadoop.hbase.security.visibility.InvalidLabelException; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Mapper; + +/** + * Write table content out to files in hdfs. + */ +@InterfaceAudience.Public +public class TsvImporterMapper +extends Mapper +{ + + /** Timestamp for all inserted rows */ + protected long ts; + + /** Column seperator */ + private String separator; + + /** Should skip bad lines */ + private boolean skipBadLines; + /** Should skip empty columns*/ + private boolean skipEmptyColumns; + private Counter badLineCount; + private boolean logBadLines; + + protected ImportTsv.TsvParser parser; + + protected Configuration conf; + + protected String cellVisibilityExpr; + + protected long ttl; + + protected CellCreator kvCreator; + + private String hfileOutPath; + + /** List of cell tags */ + private List tags; + + public long getTs() { + return ts; + } + + public boolean getSkipBadLines() { + return skipBadLines; + } + + public Counter getBadLineCount() { + return badLineCount; + } + + public void incrementBadLineCount(int count) { + this.badLineCount.increment(count); + } + + /** + * Handles initializing this class with objects specific to it (i.e., the parser). + * Common initialization that might be leveraged by a subsclass is done in + * doSetup. Hence a subclass may choose to override this method + * and call doSetup as well before handling it's own custom params. + * + * @param context + */ + @Override + protected void setup(Context context) { + doSetup(context); + + conf = context.getConfiguration(); + parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), + separator); + if (parser.getRowKeyColumnIndex() == -1) { + throw new RuntimeException("No row key column specified"); + } + this.kvCreator = new CellCreator(conf); + tags = new ArrayList<>(); + } + + /** + * Handles common parameter initialization that a subclass might want to leverage. + * @param context + */ + protected void doSetup(Context context) { + Configuration conf = context.getConfiguration(); + + // If a custom separator has been used, + // decode it back from Base64 encoding. + separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); + if (separator == null) { + separator = ImportTsv.DEFAULT_SEPARATOR; + } else { + separator = new String(Base64.decode(separator)); + } + // Should never get 0 as we are setting this to a valid value in job + // configuration. + ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); + + skipEmptyColumns = context.getConfiguration().getBoolean( + ImportTsv.SKIP_EMPTY_COLUMNS, false); + skipBadLines = context.getConfiguration().getBoolean( + ImportTsv.SKIP_LINES_CONF_KEY, true); + badLineCount = context.getCounter("ImportTsv", "Bad Lines"); + logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false); + hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY); + } + + /** + * Convert a line of TSV text into an HBase table row. + */ + @Override + public void map(LongWritable offset, Text value, + Context context) + throws IOException { + byte[] lineBytes = value.getBytes(); + + try { + ImportTsv.TsvParser.ParsedLine parsed = parser.parse( + lineBytes, value.getLength()); + ImmutableBytesWritable rowKey = + new ImmutableBytesWritable(lineBytes, + parsed.getRowKeyOffset(), + parsed.getRowKeyLength()); + // Retrieve timestamp if exists + ts = parsed.getTimestamp(ts); + cellVisibilityExpr = parsed.getCellVisibility(); + ttl = parsed.getCellTTL(); + + // create tags for the parsed line + if (hfileOutPath != null) { + tags.clear(); + if (cellVisibilityExpr != null) { + tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags( + cellVisibilityExpr)); + } + // Add TTL directly to the KV so we can vary them when packing more than one KV + // into puts + if (ttl > 0) { + tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + } + } + Put put = new Put(rowKey.copyBytes()); + for (int i = 0; i < parsed.getColumnCount(); i++) { + if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() + || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex() + || i == parser.getCellTTLColumnIndex() || (skipEmptyColumns + && parsed.getColumnLength(i) == 0)) { + continue; + } + populatePut(lineBytes, parsed, put, i); + } + context.write(rowKey, put); + } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException + | InvalidLabelException badLine) { + if (logBadLines) { + System.err.println(value); + } + System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); + if (skipBadLines) { + incrementBadLineCount(1); + return; + } + throw new IOException(badLine); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put, + int i) throws BadTsvLineException, IOException { + Cell cell = null; + if (hfileOutPath == null) { + cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, + parsed.getColumnOffset(i), parsed.getColumnLength(i)); + if (cellVisibilityExpr != null) { + // We won't be validating the expression here. The Visibility CP will do + // the validation + put.setCellVisibility(new CellVisibility(cellVisibilityExpr)); + } + if (ttl > 0) { + put.setTTL(ttl); + } + } else { + // Creating the KV which needs to be directly written to HFiles. Using the Facade + // KVCreator for creation of kvs. + cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i), + parsed.getColumnLength(i), tags); + } + put.add(cell); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java new file mode 100644 index 0000000..a3b095c --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; + +/** + * Write table content out to map output files. + */ +@InterfaceAudience.Public +public class TsvImporterTextMapper +extends Mapper +{ + + /** Column seperator */ + private String separator; + + /** Should skip bad lines */ + private boolean skipBadLines; + private Counter badLineCount; + private boolean logBadLines; + + private ImportTsv.TsvParser parser; + + public boolean getSkipBadLines() { + return skipBadLines; + } + + public Counter getBadLineCount() { + return badLineCount; + } + + public void incrementBadLineCount(int count) { + this.badLineCount.increment(count); + } + + /** + * Handles initializing this class with objects specific to it (i.e., the parser). + * Common initialization that might be leveraged by a subsclass is done in + * doSetup. Hence a subclass may choose to override this method + * and call doSetup as well before handling it's own custom params. + * + * @param context + */ + @Override + protected void setup(Context context) { + doSetup(context); + + Configuration conf = context.getConfiguration(); + + parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); + if (parser.getRowKeyColumnIndex() == -1) { + throw new RuntimeException("No row key column specified"); + } + } + + /** + * Handles common parameter initialization that a subclass might want to leverage. + * @param context + */ + protected void doSetup(Context context) { + Configuration conf = context.getConfiguration(); + + // If a custom separator has been used, + // decode it back from Base64 encoding. + separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); + if (separator == null) { + separator = ImportTsv.DEFAULT_SEPARATOR; + } else { + separator = new String(Base64.decode(separator)); + } + + skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); + logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false); + badLineCount = context.getCounter("ImportTsv", "Bad Lines"); + } + + /** + * Convert a line of TSV text into an HBase table row. + */ + @Override + public void map(LongWritable offset, Text value, Context context) throws IOException { + try { + Pair rowKeyOffests = parser.parseRowKey(value.getBytes(), value.getLength()); + ImmutableBytesWritable rowKey = new ImmutableBytesWritable( + value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond()); + context.write(rowKey, value); + } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) { + if (logBadLines) { + System.err.println(value); + } + System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); + if (skipBadLines) { + incrementBadLineCount(1); + return; + } + throw new IOException(badLine); + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java new file mode 100644 index 0000000..a83a88f --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hbase.Tag; + +/** + * Interface to convert visibility expressions into Tags for storing along with Cells in HFiles. + */ +@InterfaceAudience.Public +public interface VisibilityExpressionResolver extends Configurable { + + /** + * Giving a chance for the initialization. + */ + void init(); + + /** + * Convert visibility expression into tags to be serialized. + * @param visExpression the label expression + * @return The list of tags corresponds to the visibility expression. These tags will be stored + * along with the Cells. + */ + List createVisibilityExpTags(String visExpression) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java new file mode 100644 index 0000000..8b4e967 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.util.StringUtils; + +/** + * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files. + */ +@InterfaceAudience.Public +public class WALInputFormat extends InputFormat { + private static final Log LOG = LogFactory.getLog(WALInputFormat.class); + + public static final String START_TIME_KEY = "wal.start.time"; + public static final String END_TIME_KEY = "wal.end.time"; + + /** + * {@link InputSplit} for {@link WAL} files. Each split represent + * exactly one log file. + */ + static class WALSplit extends InputSplit implements Writable { + private String logFileName; + private long fileSize; + private long startTime; + private long endTime; + + /** for serialization */ + public WALSplit() {} + + /** + * Represent an WALSplit, i.e. a single WAL file. + * Start- and EndTime are managed by the split, so that WAL files can be + * filtered before WALEdits are passed to the mapper(s). + * @param logFileName + * @param fileSize + * @param startTime + * @param endTime + */ + public WALSplit(String logFileName, long fileSize, long startTime, long endTime) { + this.logFileName = logFileName; + this.fileSize = fileSize; + this.startTime = startTime; + this.endTime = endTime; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return fileSize; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + // TODO: Find the data node with the most blocks for this WAL? + return new String[] {}; + } + + public String getLogFileName() { + return logFileName; + } + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } + + @Override + public void readFields(DataInput in) throws IOException { + logFileName = in.readUTF(); + fileSize = in.readLong(); + startTime = in.readLong(); + endTime = in.readLong(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(logFileName); + out.writeLong(fileSize); + out.writeLong(startTime); + out.writeLong(endTime); + } + + @Override + public String toString() { + return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize; + } + } + + /** + * {@link RecordReader} for an {@link WAL} file. + * Implementation shared with deprecated HLogInputFormat. + */ + static abstract class WALRecordReader extends RecordReader { + private Reader reader = null; + // visible until we can remove the deprecated HLogInputFormat + Entry currentEntry = new Entry(); + private long startTime; + private long endTime; + private Configuration conf; + private Path logFile; + private long currentPos; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + WALSplit hsplit = (WALSplit)split; + logFile = new Path(hsplit.getLogFileName()); + conf = context.getConfiguration(); + LOG.info("Opening reader for "+split); + openReader(logFile); + this.startTime = hsplit.getStartTime(); + this.endTime = hsplit.getEndTime(); + } + + private void openReader(Path path) throws IOException + { + closeReader(); + reader = AbstractFSWALProvider.openReader(path, conf); + seek(); + setCurrentPath(path); + } + + private void setCurrentPath(Path path) { + this.logFile = path; + } + + private void closeReader() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + } + + private void seek() throws IOException { + if (currentPos != 0) { + reader.seek(currentPos); + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (reader == null) return false; + this.currentPos = reader.getPosition(); + Entry temp; + long i = -1; + try { + do { + // skip older entries + try { + temp = reader.next(currentEntry); + i++; + } catch (EOFException x) { + LOG.warn("Corrupted entry detected. Ignoring the rest of the file." + + " (This is normal when a RegionServer crashed.)"); + return false; + } + } while (temp != null && temp.getKey().getWriteTime() < startTime); + + if (temp == null) { + if (i > 0) LOG.info("Skipped " + i + " entries."); + LOG.info("Reached end of file."); + return false; + } else if (i > 0) { + LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + "."); + } + boolean res = temp.getKey().getWriteTime() <= endTime; + if (!res) { + LOG.info("Reached ts: " + temp.getKey().getWriteTime() + + " ignoring the rest of the file."); + } + return res; + } catch (IOException e) { + Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf); + if (logFile != archivedLog) { + openReader(archivedLog); + // Try call again in recursion + return nextKeyValue(); + } else { + throw e; + } + } + } + + @Override + public WALEdit getCurrentValue() throws IOException, InterruptedException { + return currentEntry.getEdit(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + // N/A depends on total number of entries, which is unknown + return 0; + } + + @Override + public void close() throws IOException { + LOG.info("Closing reader"); + if (reader != null) this.reader.close(); + } + } + + /** + * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer + * need to support HLogInputFormat. + */ + static class WALKeyRecordReader extends WALRecordReader { + @Override + public WALKey getCurrentKey() throws IOException, InterruptedException { + return currentEntry.getKey(); + } + } + + @Override + public List getSplits(JobContext context) throws IOException, + InterruptedException { + return getSplits(context, START_TIME_KEY, END_TIME_KEY); + } + + /** + * implementation shared with deprecated HLogInputFormat + */ + List getSplits(final JobContext context, final String startKey, final String endKey) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false); + Path[] inputPaths = getInputPaths(conf); + long startTime = conf.getLong(startKey, Long.MIN_VALUE); + long endTime = conf.getLong(endKey, Long.MAX_VALUE); + + List allFiles = new ArrayList(); + for(Path inputPath: inputPaths){ + FileSystem fs = inputPath.getFileSystem(conf); + try { + List files = getFiles(fs, inputPath, startTime, endTime); + allFiles.addAll(files); + } catch (FileNotFoundException e) { + if (ignoreMissing) { + LOG.warn("File "+ inputPath +" is missing. Skipping it."); + continue; + } + throw e; + } + } + List splits = new ArrayList(allFiles.size()); + for (FileStatus file : allFiles) { + splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); + } + return splits; + } + + private Path[] getInputPaths(Configuration conf) { + String inpDirs = conf.get(FileInputFormat.INPUT_DIR); + return StringUtils.stringToPath( + inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); + } + + private List getFiles(FileSystem fs, Path dir, long startTime, long endTime) + throws IOException { + List result = new ArrayList<>(); + LOG.debug("Scanning " + dir.toString() + " for WAL files"); + + RemoteIterator iter = fs.listLocatedStatus(dir); + if (!iter.hasNext()) return Collections.emptyList(); + while (iter.hasNext()) { + LocatedFileStatus file = iter.next(); + if (file.isDirectory()) { + // recurse into sub directories + result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); + } else { + String name = file.getPath().toString(); + int idx = name.lastIndexOf('.'); + if (idx > 0) { + try { + long fileStartTime = Long.parseLong(name.substring(idx+1)); + if (fileStartTime <= endTime) { + LOG.info("Found: " + file); + result.add(file); + } + } catch (NumberFormatException x) { + idx = 0; + } + } + if (idx == 0) { + LOG.warn("File " + name + " does not appear to be an WAL file. Skipping..."); + } + } + } + return result; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new WALKeyRecordReader(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java new file mode 100644 index 0000000..b1e655c --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -0,0 +1,384 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * A tool to replay WAL files as a M/R job. + * The WAL can be replayed for a set of tables or all tables, + * and a time range can be provided (in milliseconds). + * The WAL is filtered to the passed set of tables and the output + * can optionally be mapped to another set of tables. + * + * WAL replay can also generate HFiles for later bulk importing, + * in that case the WAL is replayed for a single table only. + */ +@InterfaceAudience.Public +public class WALPlayer extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(WALPlayer.class); + final static String NAME = "WALPlayer"; + public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; + public final static String TABLES_KEY = "wal.input.tables"; + public final static String TABLE_MAP_KEY = "wal.input.tablesmap"; + public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator"; + public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files"; + + + // This relies on Hadoop Configuration to handle warning about deprecated configs and + // to set the correct non-deprecated configs when an old one shows up. + static { + Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY); + Configuration.addDeprecation("hlog.input.tables", TABLES_KEY); + Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY); + } + + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + + public WALPlayer(){ + } + + protected WALPlayer(final Configuration c) { + super(c); + } + + /** + * A mapper that just writes out KeyValues. + * This one can be used together with {@link KeyValueSortReducer} + */ + static class WALKeyValueMapper + extends Mapper { + private byte[] table; + + @Override + public void map(WALKey key, WALEdit value, + Context context) + throws IOException { + try { + // skip all other tables + if (Bytes.equals(table, key.getTablename().getName())) { + for (Cell cell : value.getCells()) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + if (WALEdit.isMetaEditFamily(kv)) { + continue; + } + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) throws IOException { + // only a single table is supported when HFiles are generated with HFileOutputFormat + String[] tables = context.getConfiguration().getStrings(TABLES_KEY); + if (tables == null || tables.length != 1) { + // this can only happen when WALMapper is used directly by a class other than WALPlayer + throw new IOException("Exactly one table must be specified for bulk HFile case."); + } + table = Bytes.toBytes(tables[0]); + + } + + } + + /** + * A mapper that writes out {@link Mutation} to be directly applied to + * a running HBase instance. + */ + protected static class WALMapper + extends Mapper { + private Map tables = new TreeMap<>(); + + @Override + public void map(WALKey key, WALEdit value, Context context) + throws IOException { + try { + if (tables.isEmpty() || tables.containsKey(key.getTablename())) { + TableName targetTable = tables.isEmpty() ? + key.getTablename() : + tables.get(key.getTablename()); + ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName()); + Put put = null; + Delete del = null; + Cell lastCell = null; + for (Cell cell : value.getCells()) { + // filtering WAL meta entries + if (WALEdit.isMetaEditFamily(cell)) { + continue; + } + + // Allow a subclass filter out this cell. + if (filter(context, cell)) { + // A WALEdit may contain multiple operations (HBASE-3584) and/or + // multiple rows (HBASE-5229). + // Aggregate as much as possible into a single Put/Delete + // operation before writing to the context. + if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte() + || !CellUtil.matchingRow(lastCell, cell)) { + // row or type changed, write out aggregate KVs. + if (put != null) { + context.write(tableOut, put); + } + if (del != null) { + context.write(tableOut, del); + } + if (CellUtil.isDelete(cell)) { + del = new Delete(CellUtil.cloneRow(cell)); + } else { + put = new Put(CellUtil.cloneRow(cell)); + } + } + if (CellUtil.isDelete(cell)) { + del.add(cell); + } else { + put.add(cell); + } + } + lastCell = cell; + } + // write residual KVs + if (put != null) { + context.write(tableOut, put); + } + if (del != null) { + context.write(tableOut, del); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + protected boolean filter(Context context, final Cell cell) { + return true; + } + + @Override + protected void + cleanup(Mapper.Context context) + throws IOException, InterruptedException { + super.cleanup(context); + } + + @Override + public void setup(Context context) throws IOException { + String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY); + String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY); + if (tableMap == null) { + tableMap = tablesToUse; + } + if (tablesToUse == null) { + // Then user wants all tables. + } else if (tablesToUse.length != tableMap.length) { + // this can only happen when WALMapper is used directly by a class other than WALPlayer + throw new IOException("Incorrect table mapping specified ."); + } + int i = 0; + if (tablesToUse != null) { + for (String table : tablesToUse) { + tables.put(TableName.valueOf(table), + TableName.valueOf(tableMap[i++])); + } + } + } + } + + void setupTime(Configuration conf, String option) throws IOException { + String val = conf.get(option); + if (null == val) { + return; + } + long ms; + try { + // first try to parse in user friendly form + ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime(); + } catch (ParseException pe) { + try { + // then see if just a number of ms's was specified + ms = Long.parseLong(val); + } catch (NumberFormatException nfe) { + throw new IOException(option + + " must be specified either in the form 2001-02-20T16:35:06.99 " + + "or as number of milliseconds"); + } + } + conf.setLong(option, ms); + } + + /** + * Sets up the actual job. + * + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public Job createSubmittableJob(String[] args) throws IOException { + Configuration conf = getConf(); + setupTime(conf, WALInputFormat.START_TIME_KEY); + setupTime(conf, WALInputFormat.END_TIME_KEY); + String inputDirs = args[0]; + String[] tables = args[1].split(","); + String[] tableMap; + if (args.length > 2) { + tableMap = args[2].split(","); + if (tableMap.length != tables.length) { + throw new IOException("The same number of tables and mapping must be provided."); + } + } else { + // if not mapping is specified map each table to itself + tableMap = tables; + } + conf.setStrings(TABLES_KEY, tables); + conf.setStrings(TABLE_MAP_KEY, tableMap); + conf.set(FileInputFormat.INPUT_DIR, inputDirs); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); + job.setJarByClass(WALPlayer.class); + + job.setInputFormatClass(WALInputFormat.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + if (hfileOutPath != null) { + LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); + + // the bulk HFile case + if (tables.length != 1) { + throw new IOException("Exactly one table must be specified for the bulk export option"); + } + TableName tableName = TableName.valueOf(tables[0]); + job.setMapperClass(WALKeyValueMapper.class); + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputValueClass(KeyValue.class); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); + } + TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), + org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); + } else { + // output to live cluster + job.setMapperClass(WALMapper.class); + job.setOutputFormatClass(MultiTableOutputFormat.class); + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + // No reducers. + job.setNumReduceTasks(0); + } + String codecCls = WALCellCodec.getWALCellCodecClass(conf); + try { + TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Class.forName(codecCls)); + } catch (Exception e) { + throw new IOException("Cannot determine wal codec class " + codecCls, e); + } + return job; + } + + + /** + * Print usage + * @param errorMsg Error message. Can be null. + */ + private void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: " + NAME + " [options] []"); + System.err.println("Read all WAL entries for ."); + System.err.println("If no tables (\"\") are specific, all tables are imported."); + System.err.println("(Careful, even hbase:meta entries will be imported"+ + " in that case.)"); + System.err.println("Otherwise is a comma separated list of tables.\n"); + System.err.println("The WAL entries can be mapped to new set of tables via ."); + System.err.println(" is a command separated list of targettables."); + System.err.println("If specified, each table in must have a mapping.\n"); + System.err.println("By default " + NAME + " will load data directly into HBase."); + System.err.println("To generate HFiles for a bulk data load instead, pass the option:"); + System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); + System.err.println(" (Only one table can be specified, and no mapping is allowed!)"); + System.err.println("Other options: (specify time range to WAL edit to consider)"); + System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]"); + System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]"); + System.err.println(" -D " + JOB_NAME_CONF_KEY + + "=jobName - use the specified mapreduce job name for the wal player"); + System.err.println("For performance also consider the following options:\n" + + " -Dmapreduce.map.speculative=false\n" + + " -Dmapreduce.reduce.speculative=false"); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage("Wrong number of arguments: " + args.length); + System.exit(-1); + } + Job job = createSubmittableJob(args); + return job.waitForCompletion(true) ? 0 : 1; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java new file mode 100644 index 0000000..b1f15ba --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** +Provides HBase MapReduce +Input/OutputFormats, a table indexing MapReduce job, and utility methods. + +

See HBase and MapReduce +in the HBase Reference Guide for mapreduce over hbase documentation. +*/ +package org.apache.hadoop.hbase.mapreduce;