hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1673611 - in /hama/trunk/core/src/main/java/org/apache/hama/bsp: BSPJobClient.java FileInputFormat.java LocalBSPRunner.java
Date Wed, 15 Apr 2015 01:43:42 GMT
Author: edwardyoon
Date: Wed Apr 15 01:43:42 2015
New Revision: 1673611

URL: http://svn.apache.org/r1673611
Log:
HAMA-949: File splits based on number of input files

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1673611&r1=1673610&r2=1673611&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Wed Apr 15 01:43:42
2015
@@ -349,8 +349,14 @@ public class BSPJobClient extends Config
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
 
-      InputSplit[] splits = job.getInputFormat().getSplits(job, maxTasks);
+      InputSplit[] splits = job.getInputFormat().getSplits(job, configured);
 
+      if (maxTasks < splits.length) {
+        throw new IOException(
+            "Job failed! The number of splits has exceeded the number of max tasks. The number
of splits: "
+                + splits.length + ", The number of max tasks: " + maxTasks);
+      }
+      
       /*
       job = partition(job, splits, maxTasks);
       maxTasks = job.getInt("hama.partition.count", maxTasks);
@@ -358,12 +364,6 @@ public class BSPJobClient extends Config
       if (job.getBoolean("input.has.partitioned", false)) {
         splits = job.getInputFormat().getSplits(job, maxTasks);
       }
-
-      if (maxTasks < splits.length) {
-        throw new IOException(
-            "Job failed! The number of splits has exceeded the number of max tasks. The number
of splits: "
-                + splits.length + ", The number of max tasks: " + maxTasks);
-      }
       */
 
       job.setNumBspTask(writeSplits(job, splits, submitSplitFile, maxTasks));
@@ -631,10 +631,7 @@ public class BSPJobClient extends Config
     for (int i = 0; i < len; ++i) {
       RawSplit split = new RawSplit();
       split.readFields(in);
-      if (split.getPartitionID() != Integer.MIN_VALUE)
-        result[split.getPartitionID()] = split;
-      else
-        result[i] = split;
+      result[i] = split;
     }
     return result;
   }
@@ -1080,7 +1077,6 @@ public class BSPJobClient extends Config
     private String splitClass;
     private BytesWritable bytes = new BytesWritable();
     private String[] locations;
-    private int partitionID = Integer.MIN_VALUE;
     long dataLength;
 
     public void setBytes(byte[] data, int offset, int length) {
@@ -1091,14 +1087,6 @@ public class BSPJobClient extends Config
       splitClass = className;
     }
 
-    public void setPartitionID(int id) {
-      this.partitionID = id;
-    }
-
-    public int getPartitionID() {
-      return partitionID;
-    }
-
     public String getClassName() {
       return splitClass;
     }
@@ -1123,7 +1111,6 @@ public class BSPJobClient extends Config
     public void readFields(DataInput in) throws IOException {
       splitClass = Text.readString(in);
       dataLength = in.readLong();
-      partitionID = in.readInt();
       bytes.readFields(in);
       int len = WritableUtils.readVInt(in);
       locations = new String[len];
@@ -1136,7 +1123,6 @@ public class BSPJobClient extends Config
     public void write(DataOutput out) throws IOException {
       Text.writeString(out, splitClass);
       out.writeLong(dataLength);
-      out.writeInt(partitionID);
       bytes.write(out);
       WritableUtils.writeVInt(out, locations.length);
       for (String location : locations) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1673611&r1=1673610&r2=1673611&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Wed Apr 15 01:43:42
2015
@@ -178,6 +178,18 @@ public abstract class FileInputFormat<K,
     // generate splits
     List<InputSplit> splits = new ArrayList<InputSplit>();
     FileStatus[] files = listStatus(job);
+    
+    // take the short circuit path if we have already partitioned
+    if (numSplits == files.length) {
+      for (FileStatus file : files) {
+        if (file != null) {
+          splits.add(new FileSplit(file.getPath(), 0, file.getLen(),
+              new String[0]));
+        }
+      }
+      return splits.toArray(new FileSplit[splits.size()]);
+    }
+    
     for (FileStatus file : files) {
       Path path = file.getPath();
       FileSystem fs = path.getFileSystem(job.getConfiguration());

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1673611&r1=1673610&r2=1673611&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Wed Apr 15 01:43:42
2015
@@ -242,7 +242,6 @@ public class LocalBSPRunner implements J
       String splitname = null;
       BytesWritable realBytes = null;
       if (splits != null) {
-        LOG.debug(id + ", " + splits[id].getPartitionID());
         splitname = splits[id].getClassName();
         realBytes = splits[id].getBytes();
       }



Mime
View raw message