hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1457078 - in /hbase/trunk: hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
Date Fri, 15 Mar 2013 19:45:09 GMT
Author: stack
Date: Fri Mar 15 19:45:09 2013
New Revision: 1457078

URL: http://svn.apache.org/r1457078
Log:
HBASE-4285 partitions file created in user's home directory by importtsv

Modified:
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java?rev=1457078&r1=1457077&r2=1457078&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
(original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
Fri Mar 15 19:45:09 2013
@@ -5,6 +5,7 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Set;
@@ -15,6 +16,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
@@ -25,6 +27,7 @@ import org.apache.hadoop.hbase.client.HT
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -137,6 +140,18 @@ public class IntegrationTestImportTsv im
     }
   }
 
+  /**
+   * Confirm the absence of the {@link TotalOrderPartitioner} partitions file.
+   */
+  protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException
{
+    if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false))
+      return;
+
+    FileSystem fs = FileSystem.get(conf);
+    Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf));
+    assertFalse("Failed to clean up partitions file.", fs.exists(partitionsFile));
+  }
+
   @Test
   public void testGenerateAndLoad() throws Exception {
     String table = NAME + "-" + UUID.randomUUID();
@@ -155,8 +170,13 @@ public class IntegrationTestImportTsv im
 
     // run the job, complete the load.
     util.createTable(table, cf);
-    TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args);
+    Tool t = TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args);
     doLoadIncrementalHFiles(hfiles, table);
+
+    // validate post-conditions
+    validateDeletedPartitionsFile(t.getConf());
+
+    // clean up after ourselves.
     util.deleteTable(table);
     util.cleanupDataTestDirOnTestFS(table);
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1457078&r1=1457077&r2=1457078&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
Fri Mar 15 19:45:09 2013
@@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.mapreduc
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.util.ArrayList;
@@ -37,7 +35,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -54,9 +51,9 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -267,13 +264,12 @@ public class HFileOutputFormat extends F
   }
 
   /**
-   * Write out a SequenceFile that can be read by TotalOrderPartitioner
-   * that contains the split points in startKeys.
-   * @param partitionsPath output path for SequenceFile
-   * @param startKeys the region start keys
+   * Write out a {@link SequenceFile} that can be read by
+   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
    */
   private static void writePartitions(Configuration conf, Path partitionsPath,
       List<ImmutableBytesWritable> startKeys) throws IOException {
+    LOG.info("Writing partition information to " + partitionsPath);
     if (startKeys.isEmpty()) {
       throw new IllegalArgumentException("No regions passed");
     }
@@ -325,7 +321,6 @@ public class HFileOutputFormat extends F
   throws IOException {
     Configuration conf = job.getConfiguration();
 
-    job.setPartitionerClass(TotalOrderPartitioner.class);
     job.setOutputKeyClass(ImmutableBytesWritable.class);
     job.setOutputValueClass(KeyValue.class);
     job.setOutputFormatClass(HFileOutputFormat.class);
@@ -341,29 +336,14 @@ public class HFileOutputFormat extends F
       LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
     }
 
+    // Use table's region boundaries for TOP split points.
     LOG.info("Looking up current regions for table " + table);
     List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
     LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
         "to match current region count");
     job.setNumReduceTasks(startKeys.size());
 
-    Path partitionsPath = new Path(job.getWorkingDirectory(),
-                                   "partitions_" + UUID.randomUUID());
-    LOG.info("Writing partition information to " + partitionsPath);
-
-    FileSystem fs = partitionsPath.getFileSystem(conf);
-    writePartitions(conf, partitionsPath, startKeys);
-    partitionsPath.makeQualified(fs);
-
-    URI cacheUri;
-    try {
-      cacheUri = new URI(partitionsPath.toString() + "#" + TotalOrderPartitioner.DEFAULT_PATH);
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
-    DistributedCache.addCacheFile(cacheUri, conf);
-    DistributedCache.createSymlink(conf);
-
+    configurePartitioner(job, startKeys);
     // Set compression algorithms based on column families
     configureCompression(table, conf);
     configureBloomType(table, conf);
@@ -415,7 +395,26 @@ public class HFileOutputFormat extends F
     }
     return confValMap;
   }
-  
+
+  /**
+   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
+   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
+   */
+  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
+      throws IOException {
+
+    // create the partitions file
+    FileSystem fs = FileSystem.get(job.getConfiguration());
+    Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID());
+    fs.makeQualified(partitionsPath);
+    fs.deleteOnExit(partitionsPath);
+    writePartitions(job.getConfiguration(), partitionsPath, splitPoints);
+
+    // configure job to use it
+    job.setPartitionerClass(TotalOrderPartitioner.class);
+    TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath);
+  }
+
   /**
    * Serialize column family to compression algorithm map to configuration.
    * Invoked while configuring the MR job for incremental load.



Mime
View raw message