incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [4/5] git commit: Formatting change only.
Date Fri, 26 Sep 2014 00:17:25 GMT
Formatting change only.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/74118af1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/74118af1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/74118af1

Branch: refs/heads/master
Commit: 74118af11b96d0769ad9e3bd0b183fe1ea2123cb
Parents: fe30c20
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Sep 25 20:16:49 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Sep 25 20:16:49 2014 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/BlurOutputCommitter.java | 239 +++++++++----------
 1 file changed, 113 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/74118af1/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
index e1d94df..b5902bf 100644
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
-
 public class BlurOutputCommitter extends OutputCommitter {
 
   private static final Log LOG = LogFactory.getLog(BlurOutputCommitter.class);
@@ -55,7 +54,7 @@ public class BlurOutputCommitter extends OutputCommitter {
 
   @Override
   public void setupTask(TaskAttemptContext context) throws IOException {
-	LOG.info("Running Task setup.");
+    LOG.info("Running Task setup.");
   }
 
   @Override
@@ -92,128 +91,116 @@ public class BlurOutputCommitter extends OutputCommitter {
     _newIndex = new Path(_indexPath, _taskAttemptID.toString() + ".tmp");
   }
 
-	@Override
-	public void commitJob(JobContext jobContext) throws IOException {
-		// look through all the shards for attempts that need to be cleaned up.
-		// also find all the attempts that are finished
-		// then rename all the attempts jobs to commits
-		LOG.info("Commiting Job [{0}]", jobContext.getJobID());
-		Configuration configuration = jobContext.getConfiguration();
-		Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
-		LOG.info("TableOutput path [{0}]", tableOutput);
-		makeSureNoEmptyShards(configuration, tableOutput);
-		FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-		for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
-			LOG.info("Checking file status [{0}] with path [{1}]", fileStatus,
-					fileStatus.getPath());
-			if (isShard(fileStatus)) {
-				commitOrAbortJob(jobContext, fileStatus.getPath(), true);
-			}
-		}
-		LOG.info("Commiting Complete [{0}]", jobContext.getJobID());
-		super.commitJob(jobContext);
-	}
-
-	@Override
-	public void abortJob(JobContext jobContext, State state) throws IOException {
-		LOG.info("Abort Job [{0}]", jobContext.getJobID());
-		Configuration configuration = jobContext.getConfiguration();
-		Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
-		makeSureNoEmptyShards(configuration, tableOutput);
-		FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-		for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
-			if (isShard(fileStatus)) {
-				commitOrAbortJob(jobContext, fileStatus.getPath(), false);
-			}
-		}
-	}
-
-	private void commitOrAbortJob(JobContext jobContext, Path shardPath,
-			boolean commit) throws IOException {
-		LOG.info("CommitOrAbort [{0}] path [{1}]", commit, shardPath);
-		FileSystem fileSystem = shardPath.getFileSystem(jobContext
-				.getConfiguration());
-		FileStatus[] listStatus = fileSystem.listStatus(shardPath,
-				new PathFilter() {
-					@Override
-					public boolean accept(Path path) {
-						LOG.info("Checking path [{0}]", path);
-						if (path.getName().endsWith(".task_complete")) {
-							return true;
-						}
-						return false;
-					}
-				});
-		for (FileStatus fileStatus : listStatus) {
-			Path path = fileStatus.getPath();
-			LOG.info("Trying to commitOrAbort [{0}]", path);
-			String name = path.getName();
-			boolean taskComplete = name.endsWith(".task_complete");
-			if (fileStatus.isDir()) {
-				String taskAttemptName = getTaskAttemptName(name);
-				if (taskAttemptName == null) {
-					LOG.info("Dir name [{0}] not task attempt", name);
-					continue;
-				}
-				TaskAttemptID taskAttemptID = TaskAttemptID
-						.forName(taskAttemptName);
-				if (taskAttemptID.getJobID().toString().equals(jobContext.getJobID().toString())) {
-					if (commit) {
-						if (taskComplete) {
-							fileSystem.rename(path, new Path(shardPath,taskAttemptName + ".commit"));
-							LOG.info("Committing [{0}] in path [{1}]",
-									taskAttemptID, path);
-						} else {
-							fileSystem.delete(path, true);
-							LOG.info("Deleteing tmp dir [{0}] in path [{1}]",
-									taskAttemptID, path);
-						}
-					} else {
-						fileSystem.delete(path, true);
-						LOG.info(
-								"Deleteing aborted job dir [{0}] in path [{1}]",
-								taskAttemptID, path);
-					}
-				} else {
-					LOG.warn(
-							"TaskAttempt JobID [{0}] does not match JobContext JobId [{1}]",
-							taskAttemptID.getJobID(), jobContext.getJobID());
-				}
-			}
-		}
-	}
-
-	private String getTaskAttemptName(String name) {
-		int lastIndexOf = name.lastIndexOf('.');
-		if (lastIndexOf < 0) {
-			return null;
-		}
-		return name.substring(0, lastIndexOf);
-	}
-
-	private void makeSureNoEmptyShards(Configuration configuration,
-			Path tableOutput) throws IOException {
-		FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-		TableDescriptor tableDescriptor = BlurOutputFormat
-				.getTableDescriptor(configuration);
-		int shardCount = tableDescriptor.getShardCount();
-		for (int i = 0; i < shardCount; i++) {
-			String shardName = BlurUtil.getShardName(i);
-			fileSystem.mkdirs(new Path(tableOutput, shardName));
-		}
-	}
-
-	private boolean isShard(FileStatus fileStatus) {
-		return isShard(fileStatus.getPath());
-	}
-
-	private boolean isShard(Path path) {
-		return path.getName().startsWith(BlurConstants.SHARD_PREFIX);
-	}
-
-	@Override
-	public void setupJob(JobContext context) throws IOException {
-		LOG.info("Running Job setup.");
-	}
-  
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    // look through all the shards for attempts that need to be cleaned up.
+    // also find all the attempts that are finished
+    // then rename all the attempts jobs to commits
+    LOG.info("Commiting Job [{0}]", jobContext.getJobID());
+    Configuration configuration = jobContext.getConfiguration();
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    LOG.info("TableOutput path [{0}]", tableOutput);
+    makeSureNoEmptyShards(configuration, tableOutput);
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
+      LOG.info("Checking file status [{0}] with path [{1}]", fileStatus, fileStatus.getPath());
+      if (isShard(fileStatus)) {
+        commitOrAbortJob(jobContext, fileStatus.getPath(), true);
+      }
+    }
+    LOG.info("Commiting Complete [{0}]", jobContext.getJobID());
+    super.commitJob(jobContext);
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, State state) throws IOException {
+    LOG.info("Abort Job [{0}]", jobContext.getJobID());
+    Configuration configuration = jobContext.getConfiguration();
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    makeSureNoEmptyShards(configuration, tableOutput);
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
+      if (isShard(fileStatus)) {
+        commitOrAbortJob(jobContext, fileStatus.getPath(), false);
+      }
+    }
+  }
+
+  private void commitOrAbortJob(JobContext jobContext, Path shardPath, boolean commit) throws
IOException {
+    LOG.info("CommitOrAbort [{0}] path [{1}]", commit, shardPath);
+    FileSystem fileSystem = shardPath.getFileSystem(jobContext.getConfiguration());
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        LOG.info("Checking path [{0}]", path);
+        if (path.getName().endsWith(".task_complete")) {
+          return true;
+        }
+        return false;
+      }
+    });
+    for (FileStatus fileStatus : listStatus) {
+      Path path = fileStatus.getPath();
+      LOG.info("Trying to commitOrAbort [{0}]", path);
+      String name = path.getName();
+      boolean taskComplete = name.endsWith(".task_complete");
+      if (fileStatus.isDir()) {
+        String taskAttemptName = getTaskAttemptName(name);
+        if (taskAttemptName == null) {
+          LOG.info("Dir name [{0}] not task attempt", name);
+          continue;
+        }
+        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptName);
+        if (taskAttemptID.getJobID().toString().equals(jobContext.getJobID().toString()))
{
+          if (commit) {
+            if (taskComplete) {
+              fileSystem.rename(path, new Path(shardPath, taskAttemptName + ".commit"));
+              LOG.info("Committing [{0}] in path [{1}]", taskAttemptID, path);
+            } else {
+              fileSystem.delete(path, true);
+              LOG.info("Deleteing tmp dir [{0}] in path [{1}]", taskAttemptID, path);
+            }
+          } else {
+            fileSystem.delete(path, true);
+            LOG.info("Deleteing aborted job dir [{0}] in path [{1}]", taskAttemptID, path);
+          }
+        } else {
+          LOG.warn("TaskAttempt JobID [{0}] does not match JobContext JobId [{1}]", taskAttemptID.getJobID(),
+              jobContext.getJobID());
+        }
+      }
+    }
+  }
+
+  private String getTaskAttemptName(String name) {
+    int lastIndexOf = name.lastIndexOf('.');
+    if (lastIndexOf < 0) {
+      return null;
+    }
+    return name.substring(0, lastIndexOf);
+  }
+
+  private void makeSureNoEmptyShards(Configuration configuration, Path tableOutput) throws
IOException {
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    int shardCount = tableDescriptor.getShardCount();
+    for (int i = 0; i < shardCount; i++) {
+      String shardName = BlurUtil.getShardName(i);
+      fileSystem.mkdirs(new Path(tableOutput, shardName));
+    }
+  }
+
+  private boolean isShard(FileStatus fileStatus) {
+    return isShard(fileStatus.getPath());
+  }
+
+  private boolean isShard(Path path) {
+    return path.getName().startsWith(BlurConstants.SHARD_PREFIX);
+  }
+
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+    LOG.info("Running Job setup.");
+  }
+
 }
\ No newline at end of file


Mime
View raw message