incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/5] git commit: BlurOutputCommitter extends org.apache.hadoop.mapreduce.OutputCommitter
Date Fri, 26 Sep 2014 00:17:23 GMT
BlurOutputCommitter extends org.apache.hadoop.mapreduce.OutputCommitter

Signed-off-by: Aaron McCurry <amccurry@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/fe30c202
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/fe30c202
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/fe30c202

Branch: refs/heads/master
Commit: fe30c20230f33a6a4ec137fa59ff52c4ae183342
Parents: 2ce56fb
Author: Dibyendu Bhattacharya <dibyendu.bhattacharya1@pearson.com>
Authored: Wed Sep 24 19:00:49 2014 +0530
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Sep 25 20:06:30 2014 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/BlurOutputCommitter.java | 120 +++++++++----------
 1 file changed, 56 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/fe30c202/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
index 5a3bedb..e1d94df 100644
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
@@ -35,14 +35,62 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
-public class BlurOutputCommitter extends OutputCommitter {
 
-	private static final Log LOG = LogFactory.getLog(BlurOutputCommitter.class);
+public class BlurOutputCommitter extends OutputCommitter {
 
-	private Path _newIndex;
-	private Configuration _configuration;
-	private Path _indexPath;
-	private TableDescriptor _tableDescriptor;
+  private static final Log LOG = LogFactory.getLog(BlurOutputCommitter.class);
+
+  private Path _newIndex;
+  private Configuration _configuration;
+  private TaskAttemptID _taskAttemptID;
+  private Path _indexPath;
+  private TableDescriptor _tableDescriptor;
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+    int numReduceTasks = context.getNumReduceTasks();
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    return taskAttemptID.isMap() && numReduceTasks != 0 ? false : true;
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+	LOG.info("Running Task setup.");
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    setup(context);
+    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
+    if (fileSystem.exists(_newIndex) && !fileSystem.isFile(_newIndex)) {
+      Path dst = new Path(_indexPath, _taskAttemptID.toString() + ".task_complete");
+      LOG.info("Committing [{0}] to [{1}]", _newIndex, dst);
+      fileSystem.rename(_newIndex, dst);
+    } else {
+      throw new IOException("Path [" + _newIndex + "] does not exist, can not commit.");
+    }
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    setup(context);
+    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
+    LOG.info("abortTask - Deleting [{0}]", _newIndex);
+    fileSystem.delete(_newIndex, true);
+  }
+
+  private void setup(TaskAttemptContext context) throws IOException {
+    _configuration = context.getConfiguration();
+    _tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+    int shardCount = _tableDescriptor.getShardCount();
+    int attemptId = context.getTaskAttemptID().getTaskID().getId();
+    int shardId = attemptId % shardCount;
+    _taskAttemptID = context.getTaskAttemptID();
+    Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
+    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+    _indexPath = new Path(tableOutput, shardName);
+    _newIndex = new Path(_indexPath, _taskAttemptID.toString() + ".tmp");
+  }
 
 	@Override
 	public void commitJob(JobContext jobContext) throws IOException {
@@ -164,64 +212,8 @@ public class BlurOutputCommitter extends OutputCommitter {
 	}
 
 	@Override
-	public void abortTask(TaskAttemptContext context)
-			throws IOException {
-		setup(context);
-		FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
-		LOG.info("abortTask - Deleting [{0}]", _newIndex);
-		fileSystem.delete(_newIndex, true);
-	}
-
-	private void setup(TaskAttemptContext context) throws IOException {
-		LOG.info("Running task setup.");
-		_configuration = context.getConfiguration();
-		_tableDescriptor = BlurOutputFormat
-				.getTableDescriptor(_configuration);
-		int shardCount = _tableDescriptor.getShardCount();
-		int attemptId = context.getTaskAttemptID().getTaskID().getId();
-		int shardId = attemptId % shardCount;
-		Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
-		String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX,
-				shardId);
-		_indexPath = new Path(tableOutput, shardName);
-		_newIndex = new Path(_indexPath, context.getTaskAttemptID().toString() + ".tmp");
-	}
-
-	@Override
-	public void commitTask(
-			TaskAttemptContext context)
-			throws IOException {
-		setup(context);
-		FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
-		if (fileSystem.exists(_newIndex) && !fileSystem.isFile(_newIndex)) {
-			Path dst = new Path(_indexPath, context.getTaskAttemptID().toString().toString()
-					+ ".task_complete");
-			LOG.info("Committing [{0}] to [{1}]", _newIndex, dst);
-			fileSystem.rename(_newIndex, dst);
-		} else {
-			throw new IOException("Path [" + _newIndex
-					+ "] does not exist, can not commit.");
-		}
-	}
-
-	@Override
-	public boolean needsTaskCommit(
-			TaskAttemptContext context)
-			throws IOException {
-		int numReduceTasks = context.getNumReduceTasks();
-		TaskAttemptID taskAttemptID = context.getTaskAttemptID();
-		return taskAttemptID.isMap() && numReduceTasks != 0 ? false : true;
-	}
-
-	@Override
 	public void setupJob(JobContext context) throws IOException {
 		LOG.info("Running Job setup.");
 	}
-
-	@Override
-	public void setupTask(TaskAttemptContext context)
-			throws IOException {
-		LOG.info("Running Task setup.");
-	}
-
-}
+  
+}
\ No newline at end of file


Mime
View raw message