incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [19/20] git commit: Fixed BLUR-188
Date Mon, 05 Aug 2013 18:56:56 GMT
Fixed BLUR-188


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/76bf31e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/76bf31e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/76bf31e1

Branch: refs/heads/0.2.0-newtypesystem
Commit: 76bf31e1ea7fc6b06dee8052e4008328e6ecbaf4
Parents: dd06745
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Aug 5 14:51:41 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Aug 5 14:51:41 2013 -0400

----------------------------------------------------------------------
 .../blur/mapred/AbstractOutputCommitter.java    | 34 ++++++++++---
 .../mapreduce/lib/BlurOutputFormatTest.java     | 52 ++++++++++++++++++++
 2 files changed, 79 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/76bf31e1/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
b/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
index 22723a5..4f110e2 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
@@ -53,7 +53,7 @@ public abstract class AbstractOutputCommitter extends OutputCommitter {
     FileSystem fileSystem = tableOutput.getFileSystem(configuration);
     for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
       if (isShard(fileStatus)) {
-        commitJob(jobContext, fileStatus.getPath());
+        commitOrAbortJob(jobContext, fileStatus.getPath(), true);
       }
     }
 
@@ -69,18 +69,29 @@ public abstract class AbstractOutputCommitter extends OutputCommitter
{
     }
   }
 
-  private void commitJob(JobContext jobContext, Path shardPath) throws IOException {
+  private void commitOrAbortJob(JobContext jobContext, Path shardPath, boolean commit) throws
IOException {
     FileSystem fileSystem = shardPath.getFileSystem(jobContext.getConfiguration());
     FileStatus[] listStatus = fileSystem.listStatus(shardPath);
     for (FileStatus fileStatus : listStatus) {
       Path path = fileStatus.getPath();
       String name = path.getName();
-      if (fileStatus.isDir() && name.endsWith(".task_complete")) {
+      boolean taskComplete = name.endsWith(".task_complete");
+      if (fileStatus.isDir()) {
         String taskAttemptName = getTaskAttemptName(name);
         TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptName);
         if (taskAttemptID.getJobID().equals(jobContext.getJobID())) {
-          fileSystem.rename(path, new Path(shardPath, taskAttemptName + ".commit"));
-          LOG.info("Committing [{0}] in path [{1}]", taskAttemptID, path);
+          if (commit) {
+            if (taskComplete) {
+              fileSystem.rename(path, new Path(shardPath, taskAttemptName + ".commit"));
+              LOG.info("Committing [{0}] in path [{1}]", taskAttemptID, path);
+            } else {
+              fileSystem.delete(path, true);
+              LOG.info("Deleteing tmp dir [{0}] in path [{1}]", taskAttemptID, path);
+            }
+          } else {
+            fileSystem.delete(path, true);
+            LOG.info("Deleteing aborted job dir [{0}] in path [{1}]", taskAttemptID, path);
+          }
         }
       }
     }
@@ -101,12 +112,21 @@ public abstract class AbstractOutputCommitter extends OutputCommitter
{
 
   @Override
   public void abortJob(JobContext jobContext, int status) throws IOException {
-    System.out.println("abortJob");
+    LOG.info("Abort Job [{0}]", jobContext.getJobID());
+    Configuration configuration = jobContext.getConfiguration();
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    makeSureNoEmptyShards(configuration, tableOutput);
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
+      if (isShard(fileStatus)) {
+        commitOrAbortJob(jobContext, fileStatus.getPath(), false);
+      }
+    }
   }
 
   @Override
   public void cleanupJob(JobContext context) throws IOException {
-    System.out.println("cleanupJob");
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/76bf31e1/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index 937ac89..96d55ab 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -17,6 +17,7 @@ package org.apache.blur.mapreduce.lib;
  * limitations under the License.
  */
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.BufferedReader;
@@ -310,6 +311,57 @@ public class BlurOutputFormatTest {
 
   }
 
+  @Test
+  public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 5000, 2000, 100, "cf1"); // 100 * 5000 =
+                                                             // 500,000
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri(tableUri);
+
+    createShardDirectories(outDir, 2);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setIndexLocally(job, false);
+
+    job.submit();
+    boolean killCalled = false;
+    while (!job.isComplete()) {
+      Thread.sleep(1000);
+      System.out.printf("Killed [" + killCalled + "] Map [%f] Reduce [%f]%n", job.mapProgress()
* 100,
+          job.reduceProgress() * 100);
+      if (job.reduceProgress() > 0.7 && !killCalled) {
+        job.killJob();
+        killCalled = true;
+      }
+    }
+
+    assertFalse(job.isSuccessful());
+
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      assertEquals(0, listStatus.length);
+    }
+  }
+
   public static String readFile(String name) throws IOException {
     DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
     BufferedReader b = new BufferedReader(new InputStreamReader(f));


Mime
View raw message