hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1391277 - in /hama/trunk/core/src: main/java/org/apache/hama/bsp/FileInputFormat.java test/java/org/apache/hama/bsp/TestFileInputFormat.java
Date Fri, 28 Sep 2012 01:15:23 GMT
Author: edwardyoon
Date: Fri Sep 28 01:15:23 2012
New Revision: 1391277

URL: http://svn.apache.org/viewvc?rev=1391277&view=rev
Log:
Revert HAMA-647 patch.

Added:
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1391277&r1=1391276&r2=1391277&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Fri Sep 28 01:15:23
2012
@@ -175,37 +175,17 @@ public abstract class FileInputFormat<K,
   }
 
   /**
-   * Splits files returned by {@link #listStatus(JobConf)} when they're too big.
+   * Splits files returned by {@link #listStatus(BSPJob)} when they're too big.
    */
   @Override
   public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
     FileStatus[] files = listStatus(job);
 
-    long totalSize = 0; // compute total size
-    for (int i = 0; i < files.length; i++) { // check we have valid files
-      FileStatus file = files[i];
-      if (file.isDir()) {
-        final Path path = file.getPath();
-        if (path.getName().equals("hama-partitions")
-            || (job.get("bsp.partitioning.dir") != null && path.getName()
-                .equals(job.get("bsp.partitioning.dir")))) {
-          // if we find the partitioning dir, just remove it.
-          LOG.warn("Removing already existing partitioning directory " + path);
-          FileSystem fileSystem = path.getFileSystem(job.getConf());
-          if (!fileSystem.delete(path, true)) {
-            LOG.error("Remove failed.");
-          }
-          // remove this file from our initial list
-          files[i] = null;
-        } else {
-          throw new IOException("Not a file (dir): " + path);
-        }
-      }
-      totalSize += file.getLen();
-    }
+    long totalSize = computeTotalSize(job, files);
+    long goalSize = computeGoalSize(numSplits, totalSize);
 
     ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
-    long goalSize = 0;
+
     // take the short circuit path if we have already partitioned
     if (numSplits == files.length) {
       for (FileStatus file : files) {
@@ -215,13 +195,9 @@ public abstract class FileInputFormat<K,
         }
       }
       return splits.toArray(new FileSplit[splits.size()]);
-    } else if (files.length == 1) {
-      goalSize = totalSize / (numSplits == 0 ? 1 : numSplits - 1);
-    } else {
-      goalSize = totalSize
-          / (numSplits == 0 ? 1 : numSplits - files.length / 2 + 1);
     }
-    LOG.debug("numSplits: " + numSplits); 
+
+    LOG.info("numSplits: " + numSplits);
     long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1),
         minSplitSize);
 
@@ -268,6 +244,37 @@ public abstract class FileInputFormat<K,
     return splits.toArray(new FileSplit[splits.size()]);
   }
 
+  protected long computeTotalSize(BSPJob job, FileStatus[] files)
+      throws IOException {
+    long totalSize = 0L;
+    for (int i = 0; i < files.length; i++) { // check we have valid files
+      FileStatus file = files[i];
+      if (file.isDir()) {
+        final Path path = file.getPath();
+        if (path.getName().equals("hama-partitions")
+            || (job.get("bsp.partitioning.dir") != null && path.getName()
+                .equals(job.get("bsp.partitioning.dir")))) {
+          // if we find the partitioning dir, just remove it.
+          LOG.warn("Removing already existing partitioning directory " + path);
+          FileSystem fileSystem = path.getFileSystem(job.getConf());
+          if (!fileSystem.delete(path, true)) {
+            LOG.error("Remove failed.");
+          }
+          // remove this file from our initial list
+          files[i] = null;
+        } else {
+          throw new IOException("Not a file (dir): " + path);
+        }
+      }
+      totalSize += file.getLen();
+    }
+    return totalSize;
+  }
+
+  protected long computeGoalSize(int numSplits, long totalSize) {
+    return totalSize / (numSplits == 0 ? 1 : numSplits);
+  }
+
   protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
     if (goalSize > blockSize) {
       return Math.max(minSize, Math.max(goalSize, blockSize));

Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java?rev=1391277&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java Fri Sep 28
01:15:23 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import junit.framework.TestCase;
+
+public class TestFileInputFormat extends TestCase {
+
+  public void testComputeGoalSize() throws Exception {
+
+    TextInputFormat input = new TextInputFormat();
+    assertEquals(1000, input.computeGoalSize(10, 10000));
+    
+  }
+}



Mime
View raw message