incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject git commit: BLUR:107 fixed.
Date Mon, 27 May 2013 05:06:01 GMT
Updated Branches:
  refs/heads/0.1.5 91afa673f -> 9a8b1da5c


BLUR:107 fixed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9a8b1da5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9a8b1da5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9a8b1da5

Branch: refs/heads/0.1.5
Commit: 9a8b1da5c3abf488b525fb11c9d012032d759b2e
Parents: 91afa67
Author: Gagan <gagandeepjuneja@gmail.com>
Authored: Mon May 27 10:35:28 2013 +0530
Committer: Gagan <gagandeepjuneja@gmail.com>
Committed: Mon May 27 10:35:28 2013 +0530

----------------------------------------------------------------------
 .../blur/mapreduce/lib/BlurOutputFormat.java       |   23 ++++++-
 .../blur/mapreduce/lib/BlurOutputFormatTest.java   |   56 +++++++++++++--
 2 files changed, 73 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9a8b1da5/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index fc6820f..ae91eb8 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -41,6 +41,7 @@ import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counter;
@@ -122,8 +123,28 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
   }
 
   @Override
-  public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
{
+  public void checkOutputSpecs(JobContext context) throws IOException,
+      InterruptedException {
+    Configuration config = context.getConfiguration();
+    TableDescriptor tableDescriptor = getTableDescriptor(config);
+    if (tableDescriptor == null) {
+      throw new IOException("setTableDescriptor needs to be called first.");
+    }
+    int shardCount = tableDescriptor.getShardCount();
+    FileSystem fileSystem = getOutputPath(config).getFileSystem(config);
+    Path tablePath = new Path(tableDescriptor.getTableUri());
+    if(fileSystem.exists(tablePath)) {
+      BlurUtil.validateShardCount(shardCount, fileSystem, tablePath);
+    }else{
+      throw new IOException("Table path [ "+ tablePath + " ] doesn't exist for table [ "
+ tableDescriptor.getName() + " ].");
+    }
 
+    int reducers = context.getNumReduceTasks();
+    int reducerMultiplier = getReducerMultiplier(config);
+    int validNumberOfReducers = reducerMultiplier * shardCount;
+    if (reducers > 0 && reducers != validNumberOfReducers) {
+      throw new IllegalArgumentException("Invalid number of reducers [ " + reducers +" ]."
+ " Number of Reducers should be [ " + validNumberOfReducers + " ].");
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9a8b1da5/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index 36bfba9..915ee89 100644
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -55,6 +55,8 @@ public class BlurOutputFormatTest {
   private static MiniMRCluster mr;
   private static Path TEST_ROOT_DIR;
   private static JobConf jobConf;
+  private Path outDir = new Path(TEST_ROOT_DIR + "/out");
+  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -80,8 +82,8 @@ public class BlurOutputFormatTest {
 
   @Test
   public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException
{
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+    localFs.delete(inDir, true);
+    localFs.delete(outDir, true);
     writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
     writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
 
@@ -98,7 +100,9 @@ public class BlurOutputFormatTest {
     tableDescriptor.setShardCount(1);
     tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
     tableDescriptor.setTableUri(tableUri);
-
+    
+    createShardDirectories(outDir,1);
+    
     BlurOutputFormat.setupJob(job, tableDescriptor);
 
     assertTrue(job.waitForCompletion(true));
@@ -148,6 +152,8 @@ public class BlurOutputFormatTest {
     tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
     tableDescriptor.setTableUri(tableUri);
 
+    createShardDirectories(outDir,1);
+    
     BlurOutputFormat.setupJob(job, tableDescriptor);
     BlurOutputFormat.setIndexLocally(job, true);
     BlurOutputFormat.setOptimizeInFlight(job, false);
@@ -182,12 +188,14 @@ public class BlurOutputFormatTest {
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
-
+      
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(2);
     tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
     tableDescriptor.setTableUri(tableUri);
-
+    
+    createShardDirectories(outDir,2);
+    
     BlurOutputFormat.setupJob(job, tableDescriptor);
     BlurOutputFormat.setIndexLocally(job, false);
 
@@ -232,6 +240,8 @@ public class BlurOutputFormatTest {
     tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
     tableDescriptor.setTableUri(tableUri);
 
+    createShardDirectories(outDir,7);
+    
     BlurOutputFormat.setupJob(job, tableDescriptor);
     int multiple = 2;
     BlurOutputFormat.setReducerMultiplier(job, multiple);
@@ -254,6 +264,36 @@ public class BlurOutputFormatTest {
     assertEquals(80000, total);
 
   }
+  
+  @Test (expected = IllegalArgumentException.class)
+  public void testBlurOutputFormatValidateReducerCount() throws IOException, InterruptedException,
ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri(tableUri);
+    
+    createShardDirectories(outDir,1);
+    
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setReducerMultiplier(job, 2);
+    job.setNumReduceTasks(4);
+    job.submit();
+    
+  }
 
   public static String readFile(String name) throws IOException {
     DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
@@ -285,6 +325,12 @@ public class BlurOutputFormatTest {
     return file;
   }
 
+  private void createShardDirectories(Path outDir, int shardCount) throws IOException{
+    localFs.mkdirs(outDir);
+    for(int i=0; i<shardCount; i++){
+      localFs.mkdirs(new Path(outDir, BlurUtil.getShardName(i)));
+    }
+  }
   private String getRecord(int rowId, int recordId, String family) {
     return rowId + "," + recordId + "," + family + ",valuetoindex";
   }


Mime
View raw message