Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3683A10B88 for ; Thu, 1 Aug 2013 18:18:47 +0000 (UTC) Received: (qmail 16132 invoked by uid 500); 1 Aug 2013 18:18:47 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 16102 invoked by uid 500); 1 Aug 2013 18:18:47 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 16095 invoked by uid 99); 1 Aug 2013 18:18:46 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Aug 2013 18:18:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Aug 2013 18:18:42 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 56E9423889E1; Thu, 1 Aug 2013 18:18:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1509361 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/ Date: Thu, 01 Aug 2013 18:18:21 -0000 To: commits@hbase.apache.org From: liyin@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130801181821.56E9423889E1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: liyin Date: Thu Aug 1 18:18:20 2013 New Revision: 1509361 URL: http://svn.apache.org/r1509361 Log: [master]support incremental hfile in task subdir Author: fan Summary: support incremental hfile in task subdir Test Plan: a unit test for creating subdir path Reviewers: adela, liyintang, manukranthk, aaiyer Reviewed By: liyintang CC: hbase-eng@, vinodv Differential Revision: https://phabricator.fb.com/D896850 Task ID: 2633302 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1509361&r1=1509360&r2=1509361&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Thu Aug 1 18:18:20 2013 @@ -21,15 +21,12 @@ package org.apache.hadoop.hbase.mapreduc import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.URLDecoder; import java.net.URLEncoder; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -58,13 +55,12 @@ import org.apache.hadoop.hbase.io.hfile. import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner; import org.apache.hadoop.hbase.master.RegionPlacement; -import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; @@ -93,6 +89,8 @@ public class HFileOutputFormat extends F static final String TABLE_NAME = "hbase.hfileoutputformat.tablename"; static final String UTF8 = "UTF-8"; + protected static RecordWriter latestWriter = null; + public RecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file @@ -100,6 +98,7 @@ public class HFileOutputFormat extends F final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); final Configuration conf = context.getConfiguration(); final FileSystem fs = outputdir.getFileSystem(conf); + // These configs. are from hbase-*.xml final long maxsize = conf.getLong("hbase.hregion.max.filesize", HConstants.DEFAULT_MAX_FILE_SIZE); @@ -125,7 +124,8 @@ public class HFileOutputFormat extends F final Pair startKeysAndFavoredNodes = (table == null ? null : table.getStartKeysAndFavoredNodes()); - return new RecordWriter() { + RecordWriter writer = + new RecordWriter() { // Map of families to writers and how much has been output on the writer. private final Map writers = new TreeMap(Bytes.BYTES_COMPARATOR); @@ -266,6 +266,7 @@ public class HFileOutputFormat extends F */ w.appendMetadata(HConstants.NO_MIN_FLUSH_TIME, 0, false); w.close(); + } } @@ -276,6 +277,9 @@ public class HFileOutputFormat extends F } } }; + + latestWriter = writer; + return writer; } /* Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1509361&r1=1509360&r2=1509361&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Thu Aug 1 18:18:20 2013 @@ -74,10 +74,13 @@ public class LoadIncrementalHFiles exten public static String EXIT_ON_FIRST_FAILURE = "hbase.mapreduce.bulkload.failure.exitOnFirst"; private boolean exitOnFirstFailure; + private Configuration conf; + public LoadIncrementalHFiles(Configuration conf) { super(conf); assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); exitOnFirstFailure = conf.getBoolean(EXIT_ON_FIRST_FAILURE, true); + this.conf = conf; } public LoadIncrementalHFiles() { Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1509361&r1=1509360&r2=1509361&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Thu Aug 1 18:18:20 2013 @@ -26,16 +26,15 @@ import static org.junit.Assert.assertTru import static org.junit.Assert.fail; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Map.Entry; +import java.util.Random; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -46,13 +45,12 @@ import org.apache.hadoop.hbase.HBaseConf import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -65,18 +63,14 @@ import org.apache.hadoop.hbase.io.Immuta import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.Compression; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; -import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; -import org.apache.hadoop.hbase.master.AssignmentPlan; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.master.RegionPlacement; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -92,8 +86,6 @@ import org.apache.hadoop.mapreduce.lib.o import org.junit.Test; import org.mockito.Mockito; -import com.google.common.collect.Lists; - /** * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}. * Sets up and runs a mapreduce job that writes hfile output. @@ -253,6 +245,50 @@ public class TestHFileOutputFormat { } } + static class SimpleKVMapper + extends Mapper { + + static final String testKey = "testKey"; + static final String testValue = "testValue"; + + @Override + protected void map ( + NullWritable n1, NullWritable n2, + Mapper.Context context) + throws IOException ,InterruptedException + { + int taskId = context.getTaskAttemptID().getTaskID().getId(); + assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; + + String taskIdString = context.getTaskAttemptID().getTaskID().toString(); + byte[] keyBytes = Bytes.toBytes(testKey + taskIdString); + byte[] valBytes = Bytes.toBytes(testValue); + ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); + + for (byte[] family : TestHFileOutputFormat.FAMILIES) { + KeyValue kv = new KeyValue(keyBytes, family, + PerformanceEvaluation.QUALIFIER_NAME, valBytes); + context.write(key, kv); + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + HTable table = new HTable(conf, TABLE_NAME); + + Path outputPath = FileOutputFormat.getOutputPath(context); + FileSystem fs = outputPath.getFileSystem(conf); + Path workOutputPath = FileOutputFormat.getWorkOutputPath(context).makeQualified(fs); + + // Force flushing HFile into working directory + HFileOutputFormat.latestWriter.close(context); + + new LoadIncrementalHFiles(conf).doBulkLoad(workOutputPath, table); + } + }; + /** * Test for the union style MR jobs that runs both Put and Delete requests * @throws Exception on job, sorting, IO or fs errors @@ -621,6 +657,55 @@ public class TestHFileOutputFormat { } /** + * Test for uploading map output in cleanup stage of each task + * @throws Exception + */ + @Test + public void testUploadByTask() throws Exception { + try { + MiniHBaseCluster cluster = util.startMiniCluster(); + cluster.getMaster(); + HTable table = util.createTable(TABLE_NAME, FAMILIES); + Configuration conf = table.getConfiguration() ; + Path testDir = util.getTestDir("testUploadByTask"); + + // Generate the bulk load files + util.startMiniMapReduceCluster(); + + Job job = new Job(conf, "testUploadByTask"); + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(SimpleKVMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + job.setOutputFormatClass(HFileOutputFormat.class); + job.setNumReduceTasks(0); + HFileOutputFormat.configAsMapOutputFormat(job, table); + FileOutputFormat.setOutputPath(job, testDir); + assertTrue(job.waitForCompletion(true)); + + // Ensure data shows up + int expectedRows = conf.getInt("mapred.map.tasks", 1); + assertEquals("LoadIncrementalHFiles should put a row per task", + expectedRows, util.countRows(table)); + Scan scan = new Scan(); + ResultScanner results = table.getScanner(scan); + for (Result res : results) { + assertEquals(FAMILIES.length, res.raw().length); + for (KeyValue kv : res.raw()) { + assertTrue("Key should start with pre-defined test key", + Bytes.toStringBinary(kv.getRow()).startsWith(SimpleKVMapper.testKey)); + assertEquals("Value should equal to pre-defined value", + SimpleKVMapper.testValue, Bytes.toString(kv.getValue())); + } + } + results.close(); + } finally { + util.shutdownMiniMapReduceCluster(); + util.shutdownMiniCluster(); + } + } + + /** * Test for * {@link HFileOutputFormat#createFamilyCompressionMap(Configuration)}. Tests * that the compression map is correctly deserialized from configuration