incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1329544 - in /incubator/hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
Date Tue, 24 Apr 2012 04:06:14 GMT
Author: edwardyoon
Date: Tue Apr 24 04:06:13 2012
New Revision: 1329544

URL: http://svn.apache.org/viewvc?rev=1329544&view=rev
Log:
Record Reader/Writer objects should be initialized

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1329544&r1=1329543&r2=1329544&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Apr 24 04:06:13 2012
@@ -16,6 +16,7 @@ Release 0.5 - April 10, 2012 
 
   IMPROVEMENTS
 
+    HAMA-562: Record Reader/Writer objects should be initialized (edwardyoon)
     HAMA-555: Separate bin and src distributions (edwardyoon)
     HAMA-548: Update 0.23.0-SNAPSHOT to 0.23.1 in pom file of yarn module (edwardyoon)
     HAMA-545: Include the API and other docs in the Hama release (Suraj Menon via edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1329544&r1=1329543&r2=1329544&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Apr 24
04:06:13 2012
@@ -52,10 +52,7 @@ public final class BSPPeerImpl<K1, V1, K
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   public static enum PeerCounter {
-    SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
-    IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
-    TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
-    COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+    SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED,
MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
   }
 
   private final Configuration conf;
@@ -185,39 +182,40 @@ public final class BSPPeerImpl<K1, V1, K
 
     initInput();
 
-    // just output something when the user configured it
+    String outdir = null;
     if (conf.get("bsp.output.dir") != null) {
-      Path outdir = new Path(conf.get("bsp.output.dir"),
-          Task.getOutputName(partition));
-      outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob,
-          outdir.makeQualified(fs).toString());
-      final RecordWriter<K2, V2> finalOut = outWriter;
-
-      collector = new OutputCollector<K2, V2>() {
-        public void collect(K2 key, V2 value) throws IOException {
-          finalOut.write(key, value);
-        }
-      };
-    }
+      Path outputDir = new Path(conf.get("bsp.output.dir",
+          "tmp-" + System.currentTimeMillis()), Task.getOutputName(partition));
+      outdir = outputDir.makeQualified(fs).toString();
+    }
+    outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, outdir);
+    final RecordWriter<K2, V2> finalOut = outWriter;
+
+    collector = new OutputCollector<K2, V2>() {
+      public void collect(K2 key, V2 value) throws IOException {
+        finalOut.write(key, value);
+      }
+    };
 
   }
 
   @SuppressWarnings("unchecked")
   public final void initInput() throws IOException {
-    // just read input if the user defined one
-    if (conf.get("bsp.input.dir") != null) {
-      InputSplit inputSplit = null;
-      // reinstantiate the split
-      try {
+    InputSplit inputSplit = null;
+    // reinstantiate the split
+    try {
+      if (splitClass != null) {
         inputSplit = (InputSplit) ReflectionUtils.newInstance(
             getConfiguration().getClassByName(splitClass), getConfiguration());
-      } catch (ClassNotFoundException exp) {
-        IOException wrap = new IOException("Split class " + splitClass
-            + " not found");
-        wrap.initCause(exp);
-        throw wrap;
       }
+    } catch (ClassNotFoundException exp) {
+      IOException wrap = new IOException("Split class " + splitClass
+          + " not found");
+      wrap.initCause(exp);
+      throw wrap;
+    }
 
+    if (inputSplit != null) {
       DataInputBuffer splitBuffer = new DataInputBuffer();
       splitBuffer.reset(split.getBytes(), 0, split.getLength());
       inputSplit.readFields(splitBuffer);

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1329544&r1=1329543&r2=1329544&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Tue
Apr 24 04:06:13 2012
@@ -171,7 +171,7 @@ public class TestBSPTaskFaults extends T
   private class TestBSPTaskThreadRunner extends Thread {
 
     BSPJob job;
-
+    
     TestBSPTaskThreadRunner(BSPJob jobConf) {
       job = jobConf;
     }
@@ -325,6 +325,9 @@ public class TestBSPTaskFaults extends T
 
       try {
         BSPJob job = new BSPJob(hamaConf);
+        job.setInputFormat(NullInputFormat.class);
+        job.setOutputFormat(NullOutputFormat.class);
+        
         final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
             BSPPeerProtocol.class, BSPPeerProtocol.versionID,
             new InetSocketAddress("127.0.0.1", port), hamaConf);



Mime
View raw message