incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/5] git commit: Deleted AbstractOutputComitter
Date Fri, 26 Sep 2014 00:17:24 GMT
Deleted AbstractOutputComitter

Signed-off-by: Aaron McCurry <amccurry@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/2ce56fb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/2ce56fb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/2ce56fb5

Branch: refs/heads/master
Commit: 2ce56fb5b074e38c26ec3b6e3a76b35c8477057b
Parents: 5bc5aae
Author: Dibyendu Bhattacharya <dibyendu.bhattacharya1@pearson.com>
Authored: Wed Sep 24 15:02:20 2014 +0530
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Sep 25 20:06:30 2014 -0400

----------------------------------------------------------------------
 .../blur/mapred/AbstractOutputCommitter.java    | 148 -------------------
 1 file changed, 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2ce56fb5/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
deleted file mode 100644
index 8294738..0000000
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package org.apache.blur.mapred;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.mapreduce.lib.BlurOutputFormat;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.TaskAttemptID;
-
-public abstract class AbstractOutputCommitter extends OutputCommitter {
-
-  private final static Log LOG = LogFactory.getLog(AbstractOutputCommitter.class);
-
-  @Override
-  public void setupJob(JobContext jobContext) throws IOException {
-
-  }
-
-  @Override
-  public void commitJob(JobContext jobContext) throws IOException {
-    // look through all the shards for attempts that need to be cleaned up.
-    // also find all the attempts that are finished
-    // then rename all the attempts jobs to commits
-    LOG.info("Commiting Job [{0}]", jobContext.getJobID());
-    Configuration configuration = jobContext.getConfiguration();
-    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
-    makeSureNoEmptyShards(configuration, tableOutput);
-    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
-      if (isShard(fileStatus)) {
-        commitOrAbortJob(jobContext, fileStatus.getPath(), true);
-      }
-    }
-
-  }
-
-  private void makeSureNoEmptyShards(Configuration configuration, Path tableOutput) throws
IOException {
-    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    int shardCount = tableDescriptor.getShardCount();
-    for (int i = 0; i < shardCount; i++) {
-      String shardName = BlurUtil.getShardName(i);
-      fileSystem.mkdirs(new Path(tableOutput, shardName));
-    }
-  }
-
-  private void commitOrAbortJob(JobContext jobContext, Path shardPath, boolean commit) throws
IOException {
-    FileSystem fileSystem = shardPath.getFileSystem(jobContext.getConfiguration());
-    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        if (path.getName().endsWith(".task_complete")) {
-          return true;
-        }
-        return false;
-      }
-    });
-    for (FileStatus fileStatus : listStatus) {
-      Path path = fileStatus.getPath();
-      String name = path.getName();
-      boolean taskComplete = name.endsWith(".task_complete");
-      if (fileStatus.isDir()) {
-        String taskAttemptName = getTaskAttemptName(name);
-        if (taskAttemptName == null) {
-          LOG.info("Dir name [{0}] not task attempt", name);
-          continue;
-        }
-        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptName);
-        if (taskAttemptID.getJobID().equals(jobContext.getJobID())) {
-          if (commit) {
-            if (taskComplete) {
-              fileSystem.rename(path, new Path(shardPath, taskAttemptName + ".commit"));
-              LOG.info("Committing [{0}] in path [{1}]", taskAttemptID, path);
-            } else {
-              fileSystem.delete(path, true);
-              LOG.info("Deleteing tmp dir [{0}] in path [{1}]", taskAttemptID, path);
-            }
-          } else {
-            fileSystem.delete(path, true);
-            LOG.info("Deleteing aborted job dir [{0}] in path [{1}]", taskAttemptID, path);
-          }
-        }
-      }
-    }
-  }
-
-  private String getTaskAttemptName(String name) {
-    int lastIndexOf = name.lastIndexOf('.');
-    if (lastIndexOf < 0) {
-      return null;
-    }
-    return name.substring(0, lastIndexOf);
-  }
-
-  private boolean isShard(FileStatus fileStatus) {
-    return isShard(fileStatus.getPath());
-  }
-
-  private boolean isShard(Path path) {
-    return path.getName().startsWith(BlurConstants.SHARD_PREFIX);
-  }
-
-  @Override
-  public void abortJob(JobContext jobContext, int status) throws IOException {
-    LOG.info("Abort Job [{0}]", jobContext.getJobID());
-    Configuration configuration = jobContext.getConfiguration();
-    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
-    makeSureNoEmptyShards(configuration, tableOutput);
-    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
-      if (isShard(fileStatus)) {
-        commitOrAbortJob(jobContext, fileStatus.getPath(), false);
-      }
-    }
-  }
-
-  @Override
-  public void cleanupJob(JobContext context) throws IOException {
-
-  }
-
-}


Mime
View raw message