incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Fixed BLUR-91
Date Sun, 19 May 2013 02:03:54 GMT
Updated Branches:
  refs/heads/0.1.5 49ef3aa7a -> 77b3aedbd


Fixed BLUR-91


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/903d3c89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/903d3c89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/903d3c89

Branch: refs/heads/0.1.5
Commit: 903d3c8929d2b18d7908426189a18eed87d86155
Parents: 49ef3aa
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat May 18 21:36:45 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat May 18 21:36:45 2013 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/BlurOutputCommitter.java    |   19 ++-
 .../blur/mapreduce/lib/BlurOutputFormat.java       |   81 ++-------
 .../blur/mapreduce/lib/BlurOutputFormatTest.java   |  142 +++++++++++++--
 3 files changed, 155 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/903d3c89/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
index 83fcde1..80d12dd 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
@@ -19,6 +19,7 @@ package org.apache.blur.mapreduce.lib;
 import java.io.IOException;
 
 import org.apache.blur.mapred.AbstractOutputCommitter;
+import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -33,10 +34,19 @@ public class BlurOutputCommitter extends AbstractOutputCommitter {
   private Configuration _configuration;
   private TaskAttemptID _taskAttemptID;
   private Path _indexPath;
+  private final boolean _runTaskCommit;
+  
+  public BlurOutputCommitter() {
+    _runTaskCommit = true;
+  }
+
+  public BlurOutputCommitter(boolean isMap, int numberOfReducers) {
+    _runTaskCommit = isMap && numberOfReducers != 0 ? false : true;
+  }
 
   @Override
   public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
-    return true;
+    return _runTaskCommit;
   }
 
   @Override
@@ -62,9 +72,12 @@ public class BlurOutputCommitter extends AbstractOutputCommitter {
     fileSystem.delete(_indexPath, true);
   }
   
-  private void setup(TaskAttemptContext context) {
+  private void setup(TaskAttemptContext context) throws IOException {
     _configuration = context.getConfiguration();
-    int shardId = context.getTaskAttemptID().getTaskID().getId();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+    int shardCount = tableDescriptor.getShardCount();
+    int attemptId = context.getTaskAttemptID().getTaskID().getId();
+    int shardId = attemptId % shardCount;
     _taskAttemptID = context.getTaskAttemptID();
     Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
     String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/903d3c89/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index 77ee8bc..b6c62cf 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -106,21 +105,7 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
 
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
InterruptedException {
-    int numReduceTasks = context.getNumReduceTasks();
-    if (numReduceTasks != 0) {
-      try {
-        Class<? extends OutputFormat<?, ?>> outputFormatClass = context.getOutputFormatClass();
-        if (outputFormatClass.equals(BlurOutputFormat.class)) {
-          // Then only reducer needs committer.
-          if (context.getTaskAttemptID().isMap()) {
-            return getDoNothing();
-          }
-        }
-      } catch (ClassNotFoundException e) {
-        throw new IOException(e);
-      }
-    }
-    return new BlurOutputCommitter();
+    return new BlurOutputCommitter(context.getTaskAttemptID().isMap(),context.getNumReduceTasks());
   }
 
   public static TableDescriptor getTableDescriptor(Configuration configuration) throws IOException
{
@@ -141,6 +126,14 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     return descriptor;
   }
 
+  public static void setReducerMultiplier(Job job, int multiple) throws IOException {
+    TableDescriptor tableDescriptor = getTableDescriptor(job.getConfiguration());
+    if (tableDescriptor == null) {
+      throw new IOException("setTableDescriptor needs to be called first.");
+    }
+    job.setNumReduceTasks(tableDescriptor.getShardCount() * multiple);
+  }
+
   public static void setTableDescriptor(Job job, TableDescriptor tableDescriptor) throws
IOException {
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
     TIOStreamTransport transport = new TIOStreamTransport(outputStream);
@@ -202,15 +195,17 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     private RateCounter _copyRateCounter;
     private IndexWriter _localTmpWriter;
     private boolean _usingLocalTmpindex;
-
     private File _localTmpPath;
-
     private ProgressableDirectory _localTmpDir;
-
     private Counter _rowOverFlowCount;
 
-    public BlurRecordWriter(Configuration configuration, BlurAnalyzer blurAnalyzer, int shardId,
String tmpDirName)
+    public BlurRecordWriter(Configuration configuration, BlurAnalyzer blurAnalyzer, int attemptId,
String tmpDirName)
         throws IOException {
+      
+      TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+      int shardCount = tableDescriptor.getShardCount();
+      int shardId = attemptId % shardCount;
+      
       _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(configuration);
       Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
       String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
@@ -219,8 +214,7 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
       _finalDir = new ProgressableDirectory(new HdfsDirectory(configuration, newIndex),
           BlurOutputFormat.getProgressable());
       _finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
-
-      TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+      
       _analyzer = new BlurAnalyzer(tableDescriptor.getAnalyzerDefinition());
       _conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, _analyzer);
       TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
@@ -387,47 +381,4 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     job.setOutputFormatClass(BlurOutputFormat.class);
     setTableDescriptor(job, tableDescriptor);
   }
-
-  private OutputCommitter getDoNothing() {
-    return new OutputCommitter() {
-
-      @Override
-      public void commitJob(JobContext jobContext) throws IOException {
-      }
-
-      @Override
-      public void cleanupJob(JobContext context) throws IOException {
-      }
-
-      @Override
-      public void abortJob(JobContext jobContext, State state) throws IOException {
-      }
-
-      @Override
-      public void setupTask(TaskAttemptContext taskContext) throws IOException {
-
-      }
-
-      @Override
-      public void setupJob(JobContext jobContext) throws IOException {
-
-      }
-
-      @Override
-      public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
-        return false;
-      }
-
-      @Override
-      public void commitTask(TaskAttemptContext taskContext) throws IOException {
-
-      }
-
-      @Override
-      public void abortTask(TaskAttemptContext taskContext) throws IOException {
-
-      }
-    };
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/903d3c89/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index 56186fe..61fbe17 100644
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -25,6 +25,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.TreeSet;
 
 import org.apache.blur.store.buffer.BufferStore;
 import org.apache.blur.store.hdfs.HdfsDirectory;
@@ -32,9 +34,10 @@ import org.apache.blur.thrift.generated.AnalyzerDefinition;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
@@ -51,6 +54,7 @@ public class BlurOutputFormatTest {
   private static FileSystem localFs;
   private static MiniMRCluster mr;
   private static Path TEST_ROOT_DIR;
+  private static JobConf jobConf;
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -63,6 +67,7 @@ public class BlurOutputFormatTest {
       throw new RuntimeException("problem getting local fs", io);
     }
     mr = new MiniMRCluster(2, "file:///", 3);
+    jobConf = mr.createJobConf();
     BufferStore.init(128, 128);
   }
 
@@ -80,15 +85,10 @@ public class BlurOutputFormatTest {
     writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
     writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
 
-    Job job = new Job(conf, "blur index");
+    Job job = new Job(jobConf, "blur index");
     job.setJarByClass(BlurOutputFormatTest.class);
     job.setMapperClass(CsvBlurMapper.class);
-    job.setReducerClass(DefaultBlurReducer.class);
-    job.setNumReduceTasks(1);
     job.setInputFormatClass(TrackingTextInputFormat.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(BlurMutate.class);
-    job.setOutputFormatClass(BlurOutputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
@@ -98,18 +98,34 @@ public class BlurOutputFormatTest {
     tableDescriptor.setShardCount(1);
     tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
     tableDescriptor.setTableUri(tableUri);
-    BlurOutputFormat.setTableDescriptor(job, tableDescriptor);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
 
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
 
-    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, new Path(new Path(tableUri,
BlurUtil
-        .getShardName(0)), "attempt_local_0001_r_000000_0.commit")));
+    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
     assertEquals(2, reader.numDocs());
     reader.close();
   }
 
+  private Collection<Path> getCommitedTasks(Path path) throws IOException {
+    Collection<Path> result = new TreeSet<Path>();
+    FileSystem fileSystem = path.getFileSystem(jobConf);
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus fileStatus : listStatus) {
+      Path p = fileStatus.getPath();
+      if (fileStatus.isDir() && p.getName().endsWith(".commit")) {
+        result.add(p);
+      }
+    }
+    return result;
+  }
+
   @Test
   public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException,
ClassNotFoundException {
     localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
@@ -118,15 +134,10 @@ public class BlurOutputFormatTest {
     writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
     writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
 
-    Job job = new Job(conf, "blur index");
+    Job job = new Job(jobConf, "blur index");
     job.setJarByClass(BlurOutputFormatTest.class);
     job.setMapperClass(CsvBlurMapper.class);
-    job.setReducerClass(DefaultBlurReducer.class);
-    job.setNumReduceTasks(1);
     job.setInputFormatClass(TrackingTextInputFormat.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(BlurMutate.class);
-    job.setOutputFormatClass(BlurOutputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
@@ -136,18 +147,111 @@ public class BlurOutputFormatTest {
     tableDescriptor.setShardCount(1);
     tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
     tableDescriptor.setTableUri(tableUri);
-    BlurOutputFormat.setTableDescriptor(job, tableDescriptor);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
 
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
 
-    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, new Path(new Path(tableUri,
BlurUtil
-        .getShardName(0)), "attempt_local_0002_r_000000_0.commit")));
+    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
     assertEquals(80000, reader.numDocs());
     reader.close();
   }
 
+  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri(tableUri);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertEquals(1, commitedTasks.size());
+
+      DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+      total += reader.numDocs();
+      reader.close();
+    }
+    assertEquals(80000, total);
+
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersWithReduceMultiplierTest() throws
IOException,
+      InterruptedException, ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri(tableUri);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    int multiple = 2;
+    BlurOutputFormat.setReducerMultiplier(job, multiple);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertEquals(multiple, commitedTasks.size());
+      for (Path p : commitedTasks) {
+        DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, p));
+        total += reader.numDocs();
+        reader.close();
+      }
+    }
+    assertEquals(80000, total);
+
+  }
+
   public static String readFile(String name) throws IOException {
     DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
     BufferedReader b = new BufferedReader(new InputStreamReader(f));


Mime
View raw message