incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Fixed issue when task was aborted deleting shard index instead of just the attempt.
Date Sun, 19 May 2013 02:03:55 GMT
Fixed issue when task was aborted deleting shard index instead of just the attempt.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/77b3aedb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/77b3aedb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/77b3aedb

Branch: refs/heads/0.1.5
Commit: 77b3aedbdffac1ab0b7876b7c74749358cef40c8
Parents: 903d3c8
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat May 18 22:03:01 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat May 18 22:03:01 2013 -0400

----------------------------------------------------------------------
 .../blur/mapred/AbstractOutputCommitter.java       |   15 +++++++++-
 .../blur/mapreduce/lib/BlurOutputCommitter.java    |   22 ++++++++++-----
 .../blur/mapreduce/lib/BlurOutputFormat.java       |   15 +++++-----
 .../blur/mapreduce/lib/BlurOutputFormatTest.java   |    4 +-
 4 files changed, 39 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/77b3aedb/src/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
index 65bc9ea..1545475 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
@@ -5,7 +5,9 @@ import java.io.IOException;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -28,9 +30,10 @@ public abstract class AbstractOutputCommitter extends OutputCommitter {
     // look through all the shards for attempts that need to be cleaned up.
     // also find all the attempts that are finished
     // then rename all the attempts jobs to commits
-    LOG.info("Commiting Job [{0}]",jobContext.getJobID());
+    LOG.info("Commiting Job [{0}]", jobContext.getJobID());
     Configuration configuration = jobContext.getConfiguration();
     Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    makeSureNoEmptyShards(configuration, tableOutput);
     FileSystem fileSystem = tableOutput.getFileSystem(configuration);
     for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
       if (isShard(fileStatus)) {
@@ -40,6 +43,16 @@ public abstract class AbstractOutputCommitter extends OutputCommitter {
 
   }
 
+  private void makeSureNoEmptyShards(Configuration configuration, Path tableOutput) throws
IOException {
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    int shardCount = tableDescriptor.getShardCount();
+    for (int i = 0; i < shardCount; i++) {
+      String shardName = BlurUtil.getShardName(i);
+      fileSystem.mkdirs(new Path(tableOutput, shardName));
+    }
+  }
+
   private void commitJob(JobContext jobContext, Path shardPath) throws IOException {
     FileSystem fileSystem = shardPath.getFileSystem(jobContext.getConfiguration());
     FileStatus[] listStatus = fileSystem.listStatus(shardPath);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/77b3aedb/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
index 80d12dd..6b485a9 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
@@ -18,6 +18,8 @@ package org.apache.blur.mapreduce.lib;
  */
 import java.io.IOException;
 
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
 import org.apache.blur.mapred.AbstractOutputCommitter;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
@@ -30,12 +32,15 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 
 public class BlurOutputCommitter extends AbstractOutputCommitter {
 
+  private static final Log LOG = LogFactory.getLog(BlurOutputCommitter.class);
+
   private Path _newIndex;
   private Configuration _configuration;
   private TaskAttemptID _taskAttemptID;
   private Path _indexPath;
   private final boolean _runTaskCommit;
-  
+  private TableDescriptor _tableDescriptor;
+
   public BlurOutputCommitter() {
     _runTaskCommit = true;
   }
@@ -51,7 +56,7 @@ public class BlurOutputCommitter extends AbstractOutputCommitter {
 
   @Override
   public void setupTask(TaskAttemptContext context) throws IOException {
-    
+
   }
 
   @Override
@@ -59,7 +64,9 @@ public class BlurOutputCommitter extends AbstractOutputCommitter {
     setup(context);
     FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
     if (fileSystem.exists(_newIndex) && !fileSystem.isFile(_newIndex)) {
-      fileSystem.rename(_newIndex, new Path(_indexPath, _taskAttemptID.toString() + ".task_complete"));
+      Path dst = new Path(_indexPath, _taskAttemptID.toString() + ".task_complete");
+      LOG.info("Committing [{0}] to [{1}]", _newIndex, dst);
+      fileSystem.rename(_newIndex, dst);
     } else {
       throw new IOException("Path [" + _newIndex + "] does not exist, can not commit.");
     }
@@ -69,13 +76,14 @@ public class BlurOutputCommitter extends AbstractOutputCommitter {
   public void abortTask(TaskAttemptContext context) throws IOException {
     setup(context);
     FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
-    fileSystem.delete(_indexPath, true);
+    LOG.info("abortTask - Deleting [{0}]", _newIndex);
+    fileSystem.delete(_newIndex, true);
   }
-  
+
   private void setup(TaskAttemptContext context) throws IOException {
     _configuration = context.getConfiguration();
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
-    int shardCount = tableDescriptor.getShardCount();
+    _tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+    int shardCount = _tableDescriptor.getShardCount();
     int attemptId = context.getTaskAttemptID().getTaskID().getId();
     int shardId = attemptId % shardCount;
     _taskAttemptID = context.getTaskAttemptID();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/77b3aedb/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index b6c62cf..2fc8303 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -105,7 +105,7 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
 
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
InterruptedException {
-    return new BlurOutputCommitter(context.getTaskAttemptID().isMap(),context.getNumReduceTasks());
+    return new BlurOutputCommitter(context.getTaskAttemptID().isMap(), context.getNumReduceTasks());
   }
 
   public static TableDescriptor getTableDescriptor(Configuration configuration) throws IOException
{
@@ -198,23 +198,24 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     private File _localTmpPath;
     private ProgressableDirectory _localTmpDir;
     private Counter _rowOverFlowCount;
+    private final Path _newIndex;
 
     public BlurRecordWriter(Configuration configuration, BlurAnalyzer blurAnalyzer, int attemptId,
String tmpDirName)
         throws IOException {
-      
+
       TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
       int shardCount = tableDescriptor.getShardCount();
       int shardId = attemptId % shardCount;
-      
+
       _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(configuration);
       Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
       String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
       Path indexPath = new Path(tableOutput, shardName);
-      Path newIndex = new Path(indexPath, tmpDirName);
-      _finalDir = new ProgressableDirectory(new HdfsDirectory(configuration, newIndex),
+      _newIndex = new Path(indexPath, tmpDirName);
+      _finalDir = new ProgressableDirectory(new HdfsDirectory(configuration, _newIndex),
           BlurOutputFormat.getProgressable());
       _finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
-      
+
       _analyzer = new BlurAnalyzer(tableDescriptor.getAnalyzerDefinition());
       _conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, _analyzer);
       TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
@@ -344,7 +345,7 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
       CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
       String[] fileNames = _localDir.listAll();
       for (String fileName : fileNames) {
-        LOG.info("Copying [{0}]", fileName);
+        LOG.info("Copying [{0}] to [{1}]", fileName, _newIndex);
         _localDir.copy(copyRateDirectory, fileName, fileName, IOContext.DEFAULT);
       }
       rm(_localPath);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/77b3aedb/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index 61fbe17..05c4a9d 100644
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -225,7 +225,7 @@ public class BlurOutputFormatTest {
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(2);
+    tableDescriptor.setShardCount(7);
     tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
     tableDescriptor.setTableUri(tableUri);
 
@@ -241,7 +241,7 @@ public class BlurOutputFormatTest {
     for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
       Path path = new Path(tableUri, BlurUtil.getShardName(i));
       Collection<Path> commitedTasks = getCommitedTasks(path);
-      assertEquals(multiple, commitedTasks.size());
+      assertTrue(multiple >= commitedTasks.size());
       for (Path p : commitedTasks) {
         DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, p));
         total += reader.numDocs();


Mime
View raw message