hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From millec...@apache.org
Subject svn commit: r1558725 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/pipes/ core/src/main/java/org/apache/hama/pipes/protocol/ core/src/test/java/org/apache/hama/pipes/
Date Thu, 16 Jan 2014 09:39:26 GMT
Author: millecker
Date: Thu Jan 16 09:39:26 2014
New Revision: 1558725

URL: http://svn.apache.org/r1558725
Log:
HAMA-852: Add MessageClass property in BSPJob

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
    hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Jan 16 09:39:26 2014
@@ -23,6 +23,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-852: Add MessageClass property in BSPJob (Martin Illecker)
    HAMA-843: Message communication overhead between master aggregation and vertex computation
supersteps (edwardyoon)
    HAMA-838: Refactor aggregators (Anastasis Andronidis)
    HAMA-783: Improve the InMemory verticesInfo implementations (edwardyoon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Thu Jan 16 09:39:26 2014
@@ -101,7 +101,7 @@ public interface Constants {
   public static final String JOB_PEERS_COUNT = "bsp.peers.num";
   public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class";
   public static final String OUTPUT_FORMAT_CLASS = "bsp.output.format.class";
-  public static final String MESSAGE_CLASS = "bsp.message.type.class";
+  public static final String MESSAGE_CLASS = "bsp.message.class";
 
   // /////////////////////////////////////////////
   // Messaging related parameters.
@@ -120,7 +120,7 @@ public interface Constants {
   public static final String RUNTIME_PARTITIONING_CLASS = "bsp.input.partitioner.class";
   public static final String RUNTIME_DESIRED_PEERS_COUNT = "desired.num.of.tasks";
   public static final String RUNTIME_PARTITION_RECORDCONVERTER = "bsp.runtime.partition.recordconverter";
-  
+
   public static final String PARTITION_SORT_BY_KEY = "bsp.partition.sort.by.converted.record";
  
 
   // /////////////////////////////////////

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Thu Jan 16 09:39:26 2014
@@ -356,6 +356,24 @@ public class BSPJob extends BSPJobContex
   }
 
   /**
+   * Get the message class.
+   * 
+   * @return the message class.
+   */
+  public Class<?> getMessageClass() {
+    return conf.getClass(Constants.MESSAGE_CLASS, Text.class, Object.class);
+  }
+
+  /**
+   * Set the message class.
+   * 
+   * @param theClass the message class.
+   */
+  public void setMessageClass(Class<?> theClass) {
+    conf.setClass(Constants.MESSAGE_CLASS, theClass, Object.class);
+  }
+
+  /**
    * Sets the output path for the job.
    * 
    * @param path where the output gets written.

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Thu Jan 16 09:39:26
2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
@@ -191,7 +192,7 @@ public class Submitter implements Tool {
     setIfUnset(job.getConfiguration(), "bsp.input.value.class", textClassname);
     setIfUnset(job.getConfiguration(), "bsp.output.key.class", textClassname);
     setIfUnset(job.getConfiguration(), "bsp.output.value.class", textClassname);
-    setIfUnset(job.getConfiguration(), "bsp.message.class",
+    setIfUnset(job.getConfiguration(), Constants.MESSAGE_CLASS,
         BytesWritable.class.getName());
 
     setIfUnset(job.getConfiguration(), "bsp.job.name", "Hama Pipes Job");
@@ -205,7 +206,7 @@ public class Submitter implements Tool {
     LOG.debug("InputFormat: " + job.getOutputFormat());
     LOG.debug("OutputKeyClass: " + job.getOutputKeyClass().getName());
     LOG.debug("OutputValueClass: " + job.getOutputValueClass().getName());
-    LOG.debug("MessageClass: " + job.get("bsp.message.class"));
+    LOG.debug("MessageClass: " + job.get(Constants.MESSAGE_CLASS));
 
     LOG.debug("bsp.master.address: "
         + job.getConfiguration().get("bsp.master.address"));

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java Thu Jan
16 09:39:26 2014
@@ -354,7 +354,7 @@ public class UplinkReader<KEYIN, VALUEIN
     String peerName = Text.readString(this.inStream);
 
     M message = (M) ReflectionUtils.newInstance((Class<? extends M>) conf
-        .getClass("bsp.message.class", BytesWritable.class), conf);
+        .getClass(Constants.MESSAGE_CLASS, BytesWritable.class), conf);
 
     LOG.debug("Got MessageType.SEND_MSG peerName: " + peerName
         + " messageClass: " + message.getClass().getName());

Modified: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1558725&r1=1558724&r2=1558725&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Thu Jan 16 09:39:26
2014
@@ -223,7 +223,7 @@ public class TestPipes extends HamaClust
     bsp.setOutputFormat(SequenceFileOutputFormat.class);
     bsp.setOutputKeyClass(NullWritable.class);
     bsp.setOutputValueClass(DoubleWritable.class);
-    bsp.set("bsp.message.class", DoubleWritable.class.getName());
+    bsp.setMessageClass(DoubleWritable.class);
     return bsp;
   }
 
@@ -233,7 +233,7 @@ public class TestPipes extends HamaClust
     bsp.setOutputFormat(SequenceFileOutputFormat.class);
     bsp.setOutputKeyClass(NullWritable.class);
     bsp.setOutputValueClass(DoubleWritable.class);
-    bsp.set("bsp.message.class", IntWritable.class.getName());
+    bsp.setMessageClass(IntWritable.class);
     return bsp;
   }
 
@@ -246,10 +246,9 @@ public class TestPipes extends HamaClust
     bsp.setOutputFormat(SequenceFileOutputFormat.class);
     bsp.setOutputKeyClass(IntWritable.class);
     bsp.setOutputValueClass(PipesVectorWritable.class);
+    bsp.setMessageClass(PipesKeyValueWritable.class);
 
     bsp.set(Constants.RUNTIME_PARTITIONING_DIR, HAMA_TMP_OUTPUT + "/parts");
-    bsp.set("bsp.message.class", PipesKeyValueWritable.class.getName());
-
     bsp.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
     bsp.setPartitioner(PipesPartitioner.class);
 



Mime
View raw message