hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1448523 [3/4] - in /hama/trunk: ./ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/ft/ core/src/main/java/org/apache/hama/bsp/message/compress/ core/src/main/java/org/apach...
Date Thu, 21 Feb 2013 06:38:36 GMT
Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java Thu Feb 21 06:38:33 2013
@@ -21,7 +21,7 @@ package org.apache.hama.monitor.fd;
 import java.io.IOException;
 
 /**
- * Failure detector client, sending heartbeat to supervisor. 
+ * Failure detector client, sending heartbeat to supervisor.
  */
 public interface Sensor {
 
@@ -40,5 +40,4 @@ public interface Sensor {
    */
   void stop();
 
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java Thu Feb 21 06:38:33 2013
@@ -19,17 +19,18 @@
 package org.apache.hama.monitor.fd;
 
 /**
- * A failure detector component. It is responsible for receiving the 
- * heartbeat and output suspicion level for Interpreter.
+ * A failure detector component. It is responsible for receiving the heartbeat
+ * and output suspicion level for Interpreter.
  */
 public interface Supervisor {
 
   /**
    * Receive notification if a node fails.
+   * 
    * @param listener will be called if a node fails.
    */
   void register(NodeEventListener listener);
-  
+
   /**
    * Start supervisor.
    */

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java Thu Feb 21 06:38:33 2013
@@ -38,10 +38,10 @@ import org.apache.hama.HamaConfiguration
 public class UDPSensor implements Sensor, Callable<Object> {
 
   public static final Log LOG = LogFactory.getLog(UDPSensor.class);
-  /** 
+  /**
    * The default interval hearbeat.
    */
-  private static long HEARTBEAT_INTERVAL; 
+  private static long HEARTBEAT_INTERVAL;
 
   /* UDP server host and port */
   private String host;
@@ -53,95 +53,94 @@ public class UDPSensor implements Sensor
   private final ExecutorService scheduler;
 
   /**
-   * Constructor for UDP client. Setting up configuration 
-   * and open DatagramSocket.
+   * Constructor for UDP client. Setting up configuration and open
+   * DatagramSocket.
    */
-  public UDPSensor(HamaConfiguration configuration){
+  public UDPSensor(HamaConfiguration configuration) {
     this.host = configuration.get("bsp.monitor.fd.udp_host", "localhost");
     this.port = configuration.getInt("bsp.monitor.fd.udp_port", 16384);
-    HEARTBEAT_INTERVAL = 
-      configuration.getInt("bsp.monitor.fd.heartbeat_interval", 1000);
+    HEARTBEAT_INTERVAL = configuration.getInt(
+        "bsp.monitor.fd.heartbeat_interval", 1000);
     DatagramChannel tmp = null;
-    try{
+    try {
       tmp = DatagramChannel.open();
-    }catch(IOException ioe){
+    } catch (IOException ioe) {
       LOG.error("Unable to open datagram channel.", ioe);
     }
     this.channel = tmp;
-    if(null == this.channel)
+    if (null == this.channel)
       throw new NullPointerException("Fail to open udp channel.");
     this.scheduler = Executors.newSingleThreadExecutor();
-  } 
-
+  }
 
   /**
    * The heartbeat function, signifying its existence.
    */
   @Override
-  public void heartbeat() throws IOException{
+  public void heartbeat() throws IOException {
     ByteBuffer heartbeat = ByteBuffer.allocate(8);
     heartbeat.clear();
-    heartbeat.putLong(sequence.incrementAndGet()); 
+    heartbeat.putLong(sequence.incrementAndGet());
     heartbeat.flip();
     channel.send(heartbeat, new InetSocketAddress(this.host, this.port));
-    if(LOG.isDebugEnabled()){
-      LOG.debug("Heartbeat sequence "+sequence.get()+ " is sent to "+this.host+
-      ":"+ this.port);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Heartbeat sequence " + sequence.get() + " is sent to "
+          + this.host + ":" + this.port);
     }
   }
 
-  public String getHost(){
+  public String getHost() {
     return this.host;
   }
-  
-  public int getPort(){
+
+  public int getPort() {
     return this.port;
   }
 
-  public long heartbeatInterval(){
+  public long heartbeatInterval() {
     return HEARTBEAT_INTERVAL;
   }
 
   @Override
   public Object call() throws Exception {
-    while(running.get()){
-      try{
+    while (running.get()) {
+      try {
         heartbeat();
         Thread.sleep(HEARTBEAT_INTERVAL);
-      }catch(InterruptedException ie){
+      } catch (InterruptedException ie) {
         LOG.error("UDPSensor is interrupted.", ie);
         Thread.currentThread().interrupt();
-      }catch(IOException ioe){ 
+      } catch (IOException ioe) {
         LOG.error("Sensor fails in sending heartbeat.", ioe);
       }
     }
-    LOG.info("Sensor at "+this.host+" stops sending heartbeat.");
+    LOG.info("Sensor at " + this.host + " stops sending heartbeat.");
     return null;
   }
 
   @Override
   public void start() {
-    if(!running.compareAndSet(false, true)) {
-      throw new IllegalStateException("Sensor is already started."); 
+    if (!running.compareAndSet(false, true)) {
+      throw new IllegalStateException("Sensor is already started.");
     }
     this.scheduler.submit(this);
   }
 
   @Override
-  public void stop(){
+  public void stop() {
     running.set(false);
-    if(null != this.channel) {
-      try{ 
+    if (null != this.channel) {
+      try {
         this.channel.socket().close();
-        this.channel.close(); 
-      }catch(IOException ioe){ 
-        LOG.error("Error closing sensor channel.",ioe); 
+        this.channel.close();
+      } catch (IOException ioe) {
+        LOG.error("Error closing sensor channel.", ioe);
       }
     }
     this.scheduler.shutdown();
   }
 
-  public boolean isShutdown(){
+  public boolean isShutdown() {
     return this.channel.socket().isClosed() && !running.get();
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java Thu Feb 21 06:38:33 2013
@@ -298,7 +298,8 @@ public class BinaryProtocol<K1 extends W
     }
 
     public void readKeyValue() throws IOException {
-      boolean nullinput = peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS) == null
+      boolean nullinput = peer.getConfiguration().get(
+          Constants.INPUT_FORMAT_CLASS) == null
           || peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS)
               .equals("org.apache.hama.bsp.NullInputFormat");
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Thu Feb 21 06:38:33 2013
@@ -232,7 +232,8 @@ public class Submitter implements Tool {
 
     setIfUnset(job.getConfiguration(), "bsp.job.name", "Hama Pipes Job");
 
-    LOG.debug("isJavaRecordReader: " + getIsJavaRecordReader(job.getConfiguration()));
+    LOG.debug("isJavaRecordReader: "
+        + getIsJavaRecordReader(job.getConfiguration()));
     LOG.debug("BspClass: " + job.getBspClass().getName());
     // conf.setInputFormat(NLineInputFormat.class);
     LOG.debug("InputFormat: " + job.getInputFormat());
@@ -246,11 +247,13 @@ public class Submitter implements Tool {
       throw new IllegalArgumentException(
           "Hama Pipes does only support Text as Key/Value output!");
 
-    LOG.debug("bsp.master.address: " + job.getConfiguration().get("bsp.master.address"));
+    LOG.debug("bsp.master.address: "
+        + job.getConfiguration().get("bsp.master.address"));
     LOG.debug("bsp.local.tasks.maximum: "
         + job.getConfiguration().get("bsp.local.tasks.maximum"));
     LOG.debug("NumBspTask: " + job.getNumBspTask());
-    LOG.debug("fs.default.name: " + job.getConfiguration().get("fs.default.name"));
+    LOG.debug("fs.default.name: "
+        + job.getConfiguration().get("fs.default.name"));
 
     String exec = getExecutable(job.getConfiguration());
     if (exec == null) {
@@ -464,7 +467,8 @@ public class Submitter implements Tool {
       if (results.hasOption("programArgs")) {
         job.getConfiguration().set("hama.pipes.executable.args",
             Joiner.on(" ").join(results.getOptionValues("programArgs")));
-        // job.getConfiguration().set("hama.pipes.resolve.executable.args", "true");
+        // job.getConfiguration().set("hama.pipes.resolve.executable.args",
+        // "true");
       }
 
       if (results.hasOption("cachefiles")) {
@@ -475,7 +479,8 @@ public class Submitter implements Tool {
           FileStatus[] globStatus = fs.globStatus(path);
           for (FileStatus f : globStatus) {
             if (!f.isDir()) {
-              DistributedCache.addCacheFile(f.getPath().toUri(), job.getConfiguration());
+              DistributedCache.addCacheFile(f.getPath().toUri(),
+                  job.getConfiguration());
             } else {
               LOG.info("Ignoring directory " + f.getPath() + " while globbing.");
             }

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java Thu Feb 21 06:38:33 2013
@@ -17,13 +17,17 @@
  */
 package org.apache.hama.util;
 
-import org.apache.hama.Constants;
-import org.apache.mina.util.AvailablePortFinder;
-
 import java.io.IOException;
-import java.net.*;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
 import java.util.NoSuchElementException;
 
+import org.apache.hama.Constants;
+import org.apache.mina.util.AvailablePortFinder;
+
 /**
  * NetUtils for our needs.
  */
@@ -67,7 +71,8 @@ public class BSPNetUtils {
    * Gets a new InetSocketAddress from the given peerName. peerName must contain
    * a colon to distinct between host and port.
    * 
-   * @param peerName the name as a String of the BSP peer to get the address from
+   * @param peerName the name as a String of the BSP peer to get the address
+   *          from
    * @return the InetSocketAddress of the given BSP peer
    */
   public static InetSocketAddress getAddress(String peerName) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java Thu Feb 21 06:38:33 2013
@@ -17,17 +17,17 @@
  */
 package org.apache.hama.util;
 
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Map.Entry;
+
 import org.apache.hadoop.util.ServletUtil;
 import org.apache.hama.bsp.BSPMaster;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.GroomServerStatus;
 import org.apache.hama.bsp.JobStatus;
 
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Map.Entry;
-
 public class BSPServletUtil extends ServletUtil {
 
   public static final String HTML_TAIL = "<hr />\n"
@@ -64,8 +64,8 @@ public class BSPServletUtil extends Serv
           + "<th>SuperSteps</th>" + "<th>Tasks</th>" + "<th>Starttime</th>"
           + "</tr>\n");
       for (JobStatus status : jobs) {
-        sb.append("<tr><td><a href=\"bspjob.jsp?jobid=").append(
-            status.getJobID()).append("\">");
+        sb.append("<tr><td><a href=\"bspjob.jsp?jobid=")
+            .append(status.getJobID()).append("\">");
         sb.append(status.getJobID());
         sb.append("</a></td><td>");
         sb.append(status.getUsername());
@@ -92,21 +92,20 @@ public class BSPServletUtil extends Serv
     StringBuilder sb = new StringBuilder();
     sb.append("<center>\n");
     sb.append("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
-    sb
-        .append("<tr><td align=\"center\" colspan=\"6\"><b>Groom Servers</b></td></tr>\n");
-    sb
-        .append("<tr><td><b>Name</b></td>"
-            + "<td><b>Host</b></td>"
-            + "<td><b># maximum tasks</b></td><td><b># current running tasks</b></td>"
-            + "<td><b># current failures</b></td>"
-            + "<td><b>Last seen</b></td>" + "</tr>\n");
+    sb.append("<tr><td align=\"center\" colspan=\"6\"><b>Groom Servers</b></td></tr>\n");
+    sb.append("<tr><td><b>Name</b></td>"
+        + "<td><b>Host</b></td>"
+        + "<td><b># maximum tasks</b></td><td><b># current running tasks</b></td>"
+        + "<td><b># current failures</b></td>" + "<td><b>Last seen</b></td>"
+        + "</tr>\n");
     for (Entry<String, GroomServerStatus> entry : status
         .getActiveGroomServerStatus().entrySet()) {
       sb.append("<tr><td>");
       sb.append("<a href='http://").append(entry.getKey()).append("'>");
       sb.append(entry.getKey()).append("</a></td><td>");
-      sb.append(entry.getValue().getGroomHostName()).append("</td>").append(
-          "<td>").append(entry.getValue().getMaxTasks()).append("</td><td>");
+      sb.append(entry.getValue().getGroomHostName()).append("</td>")
+          .append("<td>").append(entry.getValue().getMaxTasks())
+          .append("</td><td>");
       sb.append(entry.getValue().countTasks()).append("</td><td>");
       sb.append(entry.getValue().getFailures()).append("</td><td>");
       sb.append(entry.getValue().getLastSeen()).append("</td>");

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java Thu Feb 21 06:38:33 2013
@@ -17,13 +17,6 @@
  */
 package org.apache.hama.util;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.Constants;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -32,6 +25,13 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.Constants;
+
 /**
  * Utility class that handles byte arrays, conversions to/from other types,
  * comparisons, hash code generation, manufacturing keys for HashMaps or
@@ -892,7 +892,10 @@ public class Bytes {
   public static boolean equals(final byte[] left, final byte[] right) {
     // Could use Arrays.equals?
     // noinspection SimplifiableConditionalExpression
-    return left == null && right == null || (!(left == null || right == null || (left.length != right.length)) && compareTo(left, right) == 0);
+    return left == null
+        && right == null
+        || (!(left == null || right == null || (left.length != right.length)) && compareTo(
+            left, right) == 0);
   }
 
   /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/ClusterUtil.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/ClusterUtil.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/ClusterUtil.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/ClusterUtil.java Thu Feb 21 06:38:33 2013
@@ -17,6 +17,9 @@
  */
 package org.apache.hama.util;
 
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -24,12 +27,9 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.BSPMaster;
 import org.apache.hama.bsp.GroomServer;
 
-import java.io.IOException;
-import java.util.List;
-
 public class ClusterUtil {
   private static final Log LOG = LogFactory.getLog(ClusterUtil.class);
-  
+
   /**
    * Data Structure to hold GroomServer Thread and GroomServer instance
    */
@@ -47,8 +47,8 @@ public class ClusterUtil {
     }
 
     /**
-     * Block until the groom server has come online, indicating it is ready
-     * to be used.
+     * Block until the groom server has come online, indicating it is ready to
+     * be used.
      */
     public void waitForServerOnline() {
       while (!groomServer.isRunning()) {
@@ -62,55 +62,58 @@ public class ClusterUtil {
   }
 
   /**
-   * Creates a {@link GroomServerThread}.
-   * Call 'start' on the returned thread to make it run.
+   * Creates a {@link GroomServerThread}. Call 'start' on the returned thread to
+   * make it run.
+   * 
    * @param c Configuration to use.
    * @param hrsc Class to create.
    * @param index Used distingushing the object returned.
    * @throws IOException
    * @return Groom server added.
    */
-  public static ClusterUtil.GroomServerThread createGroomServerThread(final Configuration c,
-    final Class<? extends GroomServer> hrsc, final int index)
-  throws IOException {
+  public static ClusterUtil.GroomServerThread createGroomServerThread(
+      final Configuration c, final Class<? extends GroomServer> hrsc,
+      final int index) throws IOException {
     GroomServer server;
-      try {
-        server = hrsc.getConstructor(Configuration.class).newInstance(c);
-      } catch (Exception e) {
-        IOException ioe = new IOException();
-        ioe.initCause(e);
-        throw ioe;
-      }
-      return new ClusterUtil.GroomServerThread(server, index);
+    try {
+      server = hrsc.getConstructor(Configuration.class).newInstance(c);
+    } catch (Exception e) {
+      IOException ioe = new IOException();
+      ioe.initCause(e);
+      throw ioe;
+    }
+    return new ClusterUtil.GroomServerThread(server, index);
   }
 
   /**
    * Start the cluster.
+   * 
    * @param m the BSP master
    * @param conf cluster configuration to be used
    * @param groomservers list of threads holding groom servers
    * @return Address to use contacting master.
-   * @throws InterruptedException 
-   * @throws IOException 
+   * @throws InterruptedException
+   * @throws IOException
    */
   public static String startup(final BSPMaster m,
-      final List<ClusterUtil.GroomServerThread> groomservers, Configuration conf) throws IOException, InterruptedException {
+      final List<ClusterUtil.GroomServerThread> groomservers, Configuration conf)
+      throws IOException, InterruptedException {
     if (m != null) {
       BSPMaster.startMaster((HamaConfiguration) conf);
     }
 
     if (groomservers != null) {
-      for (ClusterUtil.GroomServerThread t: groomservers) {
+      for (ClusterUtil.GroomServerThread t : groomservers) {
         t.start();
       }
     }
-    
-    return m == null? null: BSPMaster.getAddress(conf).getHostName();
+
+    return m == null ? null : BSPMaster.getAddress(conf).getHostName();
   }
 
   public static void shutdown(BSPMaster master,
       List<GroomServerThread> groomThreads, Configuration conf) {
     LOG.debug("Shutting down HAMA Cluster");
-    // TODO: 
+    // TODO:
   }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/LRUCache.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/LRUCache.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/LRUCache.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/LRUCache.java Thu Feb 21 06:38:33 2013
@@ -30,7 +30,7 @@ import java.util.Map;
 public abstract class LRUCache<K, V> extends LinkedHashMap<K, V> {
 
   private static final long serialVersionUID = -3347750474082019514L;
-  
+
   /**
    * The maximum size of the cache.
    */

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/RandomVariable.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/RandomVariable.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/RandomVariable.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/RandomVariable.java Thu Feb 21 06:38:33 2013
@@ -24,7 +24,7 @@ package org.apache.hama.util;
  * numbers.
  */
 public class RandomVariable {
-  
+
   /**
    * Generate a random number between 0 and 1.
    * 
@@ -45,22 +45,21 @@ public class RandomVariable {
     double x = rand();
     return i0 + (int) Math.floor((i1 - i0 + 1) * x);
   }
-  
+
   /**
-    * Generate a random string using the specified prefix and a fixed length. 
-    * @param prefix
-    *        the specified string prefix.
-    * @param length
-    *        the length of the string to be appended.
-    * @return random string.
-    */
+   * Generate a random string using the specified prefix and a fixed length.
+   * 
+   * @param prefix the specified string prefix.
+   * @param length the length of the string to be appended.
+   * @return random string.
+   */
   public static String randString(String prefix, int length) {
     StringBuilder result = new StringBuilder(prefix);
     for (int i = 0; i < length; i++) {
       char ch = (char) ((Math.random() * 26) + 97);
       result.append(ch);
     }
-      
+
     return result.toString();
   }
 
@@ -156,8 +155,8 @@ public class RandomVariable {
    * @return a double.
    */
   public static double triangular(double min, double max) {
-    return min / 2 + (max - min) * rand() / 2 + min / 2 + (max - min)
-        * rand() / 2;
+    return min / 2 + (max - min) * rand() / 2 + min / 2 + (max - min) * rand()
+        / 2;
   }
 
   /**
@@ -170,9 +169,9 @@ public class RandomVariable {
    */
   public static double triangular(double min, double med, double max) {
     double y = rand();
-    return (y < ((med - min) / (max - min))) ? (min + Math.sqrt(y
-        * (max - min) * (med - min))) : (max - Math.sqrt((1 - y) * (max - min)
-        * (max - med)));
+    return (y < ((med - min) / (max - min))) ? (min + Math.sqrt(y * (max - min)
+        * (med - min)))
+        : (max - Math.sqrt((1 - y) * (max - min) * (max - med)));
   }
 
   /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/ReflectionUtils.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/ReflectionUtils.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/ReflectionUtils.java Thu Feb 21 06:38:33 2013
@@ -20,9 +20,12 @@ package org.apache.hama.util;
 import java.lang.reflect.Constructor;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Refelction utility for BSP programmes.
  */
@@ -35,9 +38,14 @@ public class ReflectionUtils {
   @SuppressWarnings("unchecked")
   public static <T> T newInstance(String className)
       throws ClassNotFoundException {
+    return newInstance((Class<T>) Class.forName(className));
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> T newInstance(Class<T> theClass) {
+    Preconditions.checkNotNull(theClass);
     T result;
     try {
-      Class<T> theClass = (Class<T>) Class.forName(className);
       Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
       if (null == meth) {
         meth = theClass.getDeclaredConstructor(new Class[0]);

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/RunJar.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/RunJar.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/RunJar.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/RunJar.java Thu Feb 21 06:38:33 2013
@@ -75,8 +75,8 @@ public class RunJar {
   }
 
   /**
-   * Run a Hama job jar. If the main class is not in the jar's manifest, then
-   * it must be provided on the command line.
+   * Run a Hama job jar. If the main class is not in the jar's manifest, then it
+   * must be provided on the command line.
    */
   public static void main(String[] args) throws Throwable {
     String usage = "Usage: hama jar <jar> [mainClass] args...";
@@ -133,15 +133,14 @@ public class RunJar {
         classPath.add(lib.toURI().toURL());
       }
     }
-    ClassLoader loader = new URLClassLoader(classPath
-      .toArray(new URL[classPath.size()]));
+    ClassLoader loader = new URLClassLoader(classPath.toArray(new URL[classPath
+        .size()]));
 
     Thread.currentThread().setContextClassLoader(loader);
     Class<?> mainClass = loader.loadClass(mainClassName);
-    Method main = mainClass.getMethod("main", new Class[] { Array.newInstance(
-        String.class, 0).getClass() });
-    List<String> var = Arrays.asList(args).subList(firstArg,
-      args.length);
+    Method main = mainClass.getMethod("main",
+        new Class[] { Array.newInstance(String.class, 0).getClass() });
+    List<String> var = Arrays.asList(args).subList(firstArg, args.length);
     String[] newArgs = var.toArray(new String[var.size()]);
     try {
       main.invoke(null, new Object[] { newArgs });

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/StringArrayWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/StringArrayWritable.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/StringArrayWritable.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/StringArrayWritable.java Thu Feb 21 06:38:33 2013
@@ -62,4 +62,4 @@ public class StringArrayWritable impleme
     }
   }
 
-}
\ No newline at end of file
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/VersionInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/VersionInfo.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/VersionInfo.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/VersionInfo.java Thu Feb 21 06:38:33 2013
@@ -21,9 +21,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hama.HamaVersionAnnotation;
 
-
 /**
- * A version information class. The code is picked from Apache Hadoop and 
+ * A version information class. The code is picked from Apache Hadoop and
  * adapted for Hama code-base.
  */
 public class VersionInfo {
@@ -40,6 +39,7 @@ public class VersionInfo {
 
   /**
    * Get the meta-data for the Hama package.
+   * 
    * @return the current package
    */
   static Package getPackage() {
@@ -48,6 +48,7 @@ public class VersionInfo {
 
   /**
    * Get the Hama version.
+   * 
    * @return the Hama version string, eg. "0.6.3-dev"
    */
   public static String getVersion() {
@@ -56,6 +57,7 @@ public class VersionInfo {
 
   /**
    * Get the subversion revision number for the root directory
+   * 
    * @return the revision number, eg. "451451"
    */
   public static String getRevision() {
@@ -64,6 +66,7 @@ public class VersionInfo {
 
   /**
    * Get the branch on which this originated.
+   * 
    * @return The branch name, e.g. "trunk" or "branches/branch-0.20"
    */
   public static String getBranch() {
@@ -72,6 +75,7 @@ public class VersionInfo {
 
   /**
    * The date that Hama was compiled.
+   * 
    * @return the compilation date in unix date format
    */
   public static String getDate() {
@@ -80,6 +84,7 @@ public class VersionInfo {
 
   /**
    * The user that compiled Hama.
+   * 
    * @return the username of the user
    */
   public static String getUser() {
@@ -94,22 +99,19 @@ public class VersionInfo {
   }
 
   /**
-   * Get the checksum of the source files from which Hama was
-   * built.
+   * Get the checksum of the source files from which Hama was built.
    **/
   public static String getSrcChecksum() {
     return version != null ? version.srcChecksum() : "Unknown";
   }
 
   /**
-   * Returns the buildVersion which includes version, 
-   * revision, user and date. 
+   * Returns the buildVersion which includes version, revision, user and date.
    */
-  public static String getBuildVersion(){
-    return VersionInfo.getVersion() + 
-        " from " + VersionInfo.getRevision() +
-        " by " + VersionInfo.getUser() + 
-        " source checksum " + VersionInfo.getSrcChecksum();
+  public static String getBuildVersion() {
+    return VersionInfo.getVersion() + " from " + VersionInfo.getRevision()
+        + " by " + VersionInfo.getUser() + " source checksum "
+        + VersionInfo.getSrcChecksum();
   }
 
   public static void main(String[] args) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/zookeeper/ZKServerTool.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/zookeeper/ZKServerTool.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/zookeeper/ZKServerTool.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/zookeeper/ZKServerTool.java Thu Feb 21 06:38:33 2013
@@ -17,8 +17,8 @@
  */
 package org.apache.hama.zookeeper;
 
-import java.util.Properties;
 import java.util.Map.Entry;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.HamaConfiguration;
@@ -27,7 +27,7 @@ import org.apache.hama.HamaConfiguration
  * A tool class for Zookeeper use.
  */
 public class ZKServerTool {
-  
+
   /**
    * Run the tool.
    * 

Modified: hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java Thu Feb 21 06:38:33 2013
@@ -29,7 +29,7 @@ public abstract class HamaCluster extend
   public static final Log LOG = LogFactory.getLog(HamaCluster.class);
   private final static HamaConfiguration conf = new HamaConfiguration();
 
-  public HamaCluster(){
+  public HamaCluster() {
     super();
   }
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java Thu Feb 21 06:38:33 2013
@@ -56,7 +56,7 @@ public abstract class HamaClusterTestCas
     conf.set("hama.zookeeper.property.clientPort", Integer.toString(clientPort));
     conf.set(Constants.GROOM_RPC_HOST, "localhost");
     assertEquals(conf.get(Constants.GROOM_RPC_HOST), "localhost");
-    bspCluster = new MiniBSPCluster(this.conf, numOfGroom); 
+    bspCluster = new MiniBSPCluster(this.conf, numOfGroom);
     bspCluster.startBSPCluster();
   }
 
@@ -66,14 +66,14 @@ public abstract class HamaClusterTestCas
       if (this.startDfs) {
         // This spews a bunch of warnings about missing scheme. TODO: fix.
         this.dfsCluster = new MiniDFSCluster(0, this.conf, 2, true, true, true,
-          null, null, null, null);
+            null, null, null, null);
 
         // mangle the conf so that the fs parameter points to the minidfs we
         // just started up
         FileSystem filesystem = dfsCluster.getFileSystem();
         conf.set("fs.defaultFS", filesystem.getUri().toString());
         Path parentdir = filesystem.getHomeDirectory();
-        
+
         filesystem.mkdirs(parentdir);
       }
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java Thu Feb 21 06:38:33 2013
@@ -32,14 +32,14 @@ import org.apache.hama.util.Bytes;
 
 public abstract class HamaTestCase extends TestCase {
   private static Log LOG = LogFactory.getLog(HamaTestCase.class);
-  
+
   /** configuration parameter name for test directory */
   public static final String TEST_DIRECTORY_KEY = "test.build.data";
 
   private boolean localfs = false;
   protected Path testDir = null;
   protected FileSystem fs = null;
-  
+
   static {
     initialize();
   }
@@ -51,7 +51,7 @@ public abstract class HamaTestCase exten
     super();
     init();
   }
-  
+
   /**
    * @param name
    */
@@ -59,7 +59,7 @@ public abstract class HamaTestCase exten
     super(name);
     init();
   }
-  
+
   private void init() {
     conf = new HamaConfiguration();
     System.setProperty("hama.log.dir", "/tmp/hama-test/logs/");
@@ -76,8 +76,7 @@ public abstract class HamaTestCase exten
   @Override
   protected void setUp() throws Exception {
     super.setUp();
-    localfs =
-      (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0);
+    localfs = (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0);
 
     if (fs == null) {
       this.fs = FileSystem.get(conf);
@@ -89,8 +88,7 @@ public abstract class HamaTestCase exten
           fs.delete(testDir, true);
         }
       } else {
-        this.testDir =
-          this.fs.makeQualified(new Path("/tmp/hama-test"));
+        this.testDir = this.fs.makeQualified(new Path("/tmp/hama-test"));
       }
     } catch (Exception e) {
       LOG.fatal("error during setup", e);
@@ -113,28 +111,28 @@ public abstract class HamaTestCase exten
   }
 
   protected Path getUnitTestdir(String testName) {
-    return new Path(
-        conf.get(TEST_DIRECTORY_KEY, "/tmp/hama-test/build/data"), testName);
+    return new Path(conf.get(TEST_DIRECTORY_KEY, "/tmp/hama-test/build/data"),
+        testName);
   }
 
   /**
    * Initializes parameters used in the test environment:
-   *
+   * 
    * Sets the configuration parameter TEST_DIRECTORY_KEY if not already set.
-   * Sets the boolean debugging if "DEBUGGING" is set in the environment.
-   * If debugging is enabled, reconfigures logging so that the root log level is
+   * Sets the boolean debugging if "DEBUGGING" is set in the environment. If
+   * debugging is enabled, reconfigures logging so that the root log level is
    * set to WARN and the logging level for the package is set to DEBUG.
    */
   public static void initialize() {
     if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
-      System.setProperty(TEST_DIRECTORY_KEY, new File(
-          "build/hama/test").getAbsolutePath());
+      System.setProperty(TEST_DIRECTORY_KEY,
+          new File("build/hama/test").getAbsolutePath());
     }
   }
 
   /**
    * Common method to close down a MiniDFSCluster and the associated file system
-   *
+   * 
    * @param cluster
    */
   public static void shutdownDfs(MiniDFSCluster cluster) {
@@ -143,7 +141,7 @@ public abstract class HamaTestCase exten
       try {
         cluster.shutdown();
       } catch (Exception e) {
-        /// Can get a java.lang.reflect.UndeclaredThrowableException thrown
+        // / Can get a java.lang.reflect.UndeclaredThrowableException thrown
         // here because of an InterruptedException. Don't let exceptions in
         // here be cause of test failure.
       }
@@ -160,21 +158,18 @@ public abstract class HamaTestCase exten
     }
   }
 
-  public void assertByteEquals(byte[] expected,
-                               byte[] actual) {
+  public void assertByteEquals(byte[] expected, byte[] actual) {
     if (Bytes.compareTo(expected, actual) != 0) {
-      throw new AssertionFailedError("expected:<" +
-      Bytes.toString(expected) + "> but was:<" +
-      Bytes.toString(actual) + ">");
+      throw new AssertionFailedError("expected:<" + Bytes.toString(expected)
+          + "> but was:<" + Bytes.toString(actual) + ">");
     }
   }
 
-  public static void assertEquals(byte[] expected,
-                               byte[] actual) {
+  public static void assertEquals(byte[] expected, byte[] actual) {
     if (Bytes.compareTo(expected, actual) != 0) {
-      throw new AssertionFailedError("expected:<" +
-      Bytes.toStringBinary(expected) + "> but was:<" +
-      Bytes.toStringBinary(actual) + ">");
+      throw new AssertionFailedError("expected:<"
+          + Bytes.toStringBinary(expected) + "> but was:<"
+          + Bytes.toStringBinary(actual) + ">");
     }
   }
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java Thu Feb 21 06:38:33 2013
@@ -17,23 +17,21 @@
  */
 package org.apache.hama;
 
-import java.io.IOException;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static junit.framework.Assert.fail;
 
+import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import static java.util.concurrent.TimeUnit.*;
-
-import static junit.framework.Assert.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hama.bsp.BSPMaster;
 import org.apache.hama.bsp.GroomServer;
-import org.apache.hama.HamaConfiguration;
 
 public class MiniBSPCluster {
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java Thu Feb 21 06:38:33 2013
@@ -35,7 +35,7 @@ import org.apache.zookeeper.server.ZooKe
 import org.apache.zookeeper.server.persistence.FileTxnLog;
 
 /**
- * This class starts and runs the MiniZookeeperCluster. 
+ * This class starts and runs the MiniZookeeperCluster.
  */
 public class MiniZooKeeperCluster {
   private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
@@ -77,8 +77,7 @@ public class MiniZooKeeperCluster {
    * @throws IOException
    * @throws InterruptedException
    */
-  public int startup(File baseDir) throws IOException,
-      InterruptedException {
+  public int startup(File baseDir) throws IOException, InterruptedException {
 
     setupTestEnv();
 
@@ -96,12 +95,12 @@ public class MiniZooKeeperCluster {
     ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
     while (true) {
       try {
-        standaloneServerFactory =
-          new NIOServerCnxnFactory();
-          standaloneServerFactory.configure(new InetSocketAddress(clientPort), CONNECTION_TIMEOUT);
+        standaloneServerFactory = new NIOServerCnxnFactory();
+        standaloneServerFactory.configure(new InetSocketAddress(clientPort),
+            CONNECTION_TIMEOUT);
       } catch (BindException e) {
         LOG.info("Faild binding ZK Server to client port: " + clientPort);
-        //this port is already in use. try to use another
+        // this port is already in use. try to use another
         clientPort++;
         continue;
       }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Thu Feb 21 06:38:33 2013
@@ -85,7 +85,7 @@ public class TestBSPMasterGroomServer ex
     bsp.setOutputKeyClass(IntWritable.class);
     bsp.setOutputValueClass(Text.class);
     bsp.setOutputPath(OUTPUT_PATH);
-    
+
     bsp.setCompressionCodec(SnappyCompressor.class);
     bsp.setCompressionThreshold(40);
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java Thu Feb 21 06:38:33 2013
@@ -48,7 +48,8 @@ public class TestClusterStatus extends T
       int num = rnd.nextInt();
       String groomName = "groom_" + num;
       String peerName = "peerhost:" + num;
-      grooms.put(groomName, new GroomServerStatus(peerName, new ArrayList<TaskStatus>(0), 25, 2));
+      grooms.put(groomName, new GroomServerStatus(peerName,
+          new ArrayList<TaskStatus>(0), 25, 2));
     }
 
     int tasks = rnd.nextInt(100);
@@ -62,10 +63,11 @@ public class TestClusterStatus extends T
 
     ClusterStatus status2 = new ClusterStatus();
     status2.readFields(in);
-    
-    for(Entry<String, GroomServerStatus> entry : status2.getActiveGroomServerStatus().entrySet()){
-      assertEquals(entry.getValue().getMaxTasks(),2);
-      assertEquals(entry.getValue().getFailures(),25);
+
+    for (Entry<String, GroomServerStatus> entry : status2
+        .getActiveGroomServerStatus().entrySet()) {
+      assertEquals(entry.getValue().getMaxTasks(), 2);
+      assertEquals(entry.getValue().getFailures(), 25);
     }
 
     Map<String, String> grooms_s = new HashMap<String, String>(

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java Thu Feb 21 06:38:33 2013
@@ -24,7 +24,8 @@ public class TestFileInputFormat extends
   public void testComputeGoalSize() throws Exception {
 
     TextInputFormat input = new TextInputFormat();
-    assertTrue(1000 < input.computeGoalSize(10, 10000) && 1200 > input.computeGoalSize(10, 10000));
-    
+    assertTrue(1000 < input.computeGoalSize(10, 10000)
+        && 1200 > input.computeGoalSize(10, 10000));
+
   }
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java Thu Feb 21 06:38:33 2013
@@ -33,13 +33,13 @@ public class TestLocalRunner extends Tes
     conf.set("bsp.local.dir", "/tmp/hama-test");
     BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
     bsp.setJobName("Test Serialize Printing with Output");
-    
+
     bsp.setBspClass(org.apache.hama.examples.ClassSerializePrinting.class);
     bsp.setOutputFormat(SequenceFileOutputFormat.class);
     bsp.setOutputKeyClass(IntWritable.class);
     bsp.setOutputValueClass(Text.class);
     bsp.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
-    
+
     conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
     bsp.setNumBspTask(2);
     bsp.setInputFormat(NullInputFormat.class);

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Thu Feb 21 06:38:33 2013
@@ -54,7 +54,7 @@ public class TestPartitioning extends Te
     bsp.setInputPath(new Path("../CHANGES.txt"));
     bsp.setPartitioner(HashPartitioner.class);
     assertTrue(bsp.waitForCompletion(true));
-    
+
     FileSystem fs = FileSystem.get(conf);
     fs.delete(new Path("/tmp/hama-test/partitioning/localtest"), true);
   }
@@ -63,12 +63,14 @@ public class TestPartitioning extends Te
       BSP<LongWritable, Text, NullWritable, NullWritable, NullWritable> {
 
     @Override
-    public void bsp(BSPPeer<LongWritable, Text, NullWritable, NullWritable, NullWritable> peer)
+    public void bsp(
+        BSPPeer<LongWritable, Text, NullWritable, NullWritable, NullWritable> peer)
         throws IOException, SyncException, InterruptedException {
       long numOfPairs = 0;
       KeyValuePair<LongWritable, Text> readNext = null;
       while ((readNext = peer.readNext()) != null) {
-        LOG.debug(readNext.getKey().get() + " / " + readNext.getValue().toString());
+        LOG.debug(readNext.getKey().get() + " / "
+            + readNext.getValue().toString());
         numOfPairs++;
       }
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Thu Feb 21 06:38:33 2013
@@ -72,11 +72,11 @@ public class TestZooKeeper extends TestC
       executorService.submit(new Runnable() {
         @Override
         public void run() {
-            try {
-                server.start();
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+          try {
+            server.start();
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
         }
       });
 
@@ -127,10 +127,10 @@ public class TestZooKeeper extends TestC
 
       Log.info("Passed the child count test");
 
-      masterClient.addKey(masterClient.constructKey(jobID, "peer", "1"),
-          true, null);
-      masterClient.addKey(masterClient.constructKey(jobID, "peer", "2"),
-          true, null);
+      masterClient.addKey(masterClient.constructKey(jobID, "peer", "1"), true,
+          null);
+      masterClient.addKey(masterClient.constructKey(jobID, "peer", "2"), true,
+          null);
 
       String[] peerChild = masterClient.getChildKeySet(
           masterClient.constructKey(jobID, "peer"), null);
@@ -157,26 +157,26 @@ public class TestZooKeeper extends TestC
               new IntWritable());
 
       assertEquals(false, result);
-      
+
       Writable[] writableArr = new Writable[2];
       writableArr[0] = new LongWritable(3L);
       writableArr[1] = new LongWritable(5L);
       ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
       arrWritable.set(writableArr);
       masterClient.storeInformation(
-          masterClient.constructKey(jobID, "info", "level3"), 
-          arrWritable, true, null);
-      
+          masterClient.constructKey(jobID, "info", "level3"), arrWritable,
+          true, null);
+
       ArrayWritable valueHolder = new ArrayWritable(LongWritable.class);
-      
+
       boolean getResult = masterClient.getInformation(
           masterClient.constructKey(jobID, "info", "level3"), valueHolder);
-      
+
       assertTrue(getResult);
-      
+
       assertEquals(arrWritable.get()[0], valueHolder.get()[0]);
       assertEquals(arrWritable.get()[1], valueHolder.get()[1]);
-      
+
       Log.info("Passed array writable test");
       done = true;
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java Thu Feb 21 06:38:33 2013
@@ -35,6 +35,7 @@ public class TestSpillingQueue extends T
 
   /**
    * Test the spilling queue where the message class is specified.
+   * 
    * @throws Exception
    */
   public void testTextSpillingQueue() throws Exception {
@@ -44,9 +45,8 @@ public class TestSpillingQueue extends T
     TaskAttemptID id = new TaskAttemptID(new TaskID("123", 1, 2), 0);
     SpillingQueue<Text> queue = new SpillingQueue<Text>();
     Configuration conf = new HamaConfiguration();
-    
-    String fileName = 
-        System.getProperty("java.io.tmpdir") + File.separatorChar
+
+    String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
         + new BigInteger(128, new SecureRandom()).toString(32);
     File file = new File(fileName);
     conf.set(SpillingQueue.SPILLBUFFER_FILENAME, fileName);
@@ -54,25 +54,26 @@ public class TestSpillingQueue extends T
         Writable.class);
     queue.init(conf, id);
     queue.prepareWrite();
-    for(int i = 0; i < 1000; ++i){
+    for (int i = 0; i < 1000; ++i) {
       queue.add(text);
     }
     queue.prepareRead();
-    for(Text t: queue){
+    for (Text t : queue) {
       assertTrue(msg.equals(t.toString()));
       text.clear();
     }
-    
+
     assertTrue(queue.poll() == null);
-    
+
     assertTrue(file.exists());
     queue.close();
     assertFalse(file.exists());
   }
-  
+
   /**
    * Test the spilling queue where the message class is not specified and the
    * queue uses ObjectWritable to store messages.
+   * 
    * @throws Exception
    */
   public void testObjectWritableSpillingQueue() throws Exception {
@@ -82,25 +83,24 @@ public class TestSpillingQueue extends T
     TaskAttemptID id = new TaskAttemptID(new TaskID("123", 1, 2), 0);
     SpillingQueue<Text> queue = new SpillingQueue<Text>();
     Configuration conf = new HamaConfiguration();
-    
-    String fileName = 
-        System.getProperty("java.io.tmpdir") + File.separatorChar
+
+    String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
         + new BigInteger(128, new SecureRandom()).toString(32);
     File file = new File(fileName);
     conf.set(SpillingQueue.SPILLBUFFER_FILENAME, fileName);
     queue.init(conf, id);
     queue.prepareWrite();
-    for(int i = 0; i < 1000; ++i){
+    for (int i = 0; i < 1000; ++i) {
       queue.add(text);
     }
     queue.prepareRead();
-    for(Text t: queue){
+    for (Text t : queue) {
       assertTrue(msg.equals(t.toString()));
       text.clear();
     }
-    
+
     assertTrue(queue.poll() == null);
-    
+
     assertTrue(file.exists());
     queue.close();
     assertFalse(file.exists());

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java Thu Feb 21 06:38:33 2013
@@ -36,7 +36,7 @@ public class TestBSPMessageCompressor ex
         SnappyCompressor.class, BSPMessageCompressor.class);
     compressor = new BSPMessageCompressorFactory<IntegerMessage>()
         .getCompressor(configuration);
-    
+
     assertNotNull(compressor);
 
     int n = 20;

Modified: hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java Thu Feb 21 06:38:33 2013
@@ -46,12 +46,12 @@ public class ClassSerializePrinting exte
 
         bspPeer.send(otherPeer, map);
       }
-      
+
       // Test superstep counter
       if (i != bspPeer.getSuperstepCount()) {
         throw new IOException();
       }
-      
+
       bspPeer.sync();
 
       MapWritable msg = null;

Modified: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java Thu Feb 21 06:38:33 2013
@@ -223,13 +223,13 @@ public class TestIPC extends TestCase {
     } catch (IOException e) {
       String message = e.getMessage();
       String addressText = address.toString();
-      assertTrue("Did not find " + addressText + " in " + message, message
-          .contains(addressText));
+      assertTrue("Did not find " + addressText + " in " + message,
+          message.contains(addressText));
       Throwable cause = e.getCause();
       assertNotNull("No nested exception in " + e, cause);
       String causeText = cause.getMessage();
-      assertTrue("Did not find " + causeText + " in " + message, message
-          .contains(causeText));
+      assertTrue("Did not find " + causeText + " in " + message,
+          message.contains(causeText));
     }
   }
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java Thu Feb 21 06:38:33 2013
@@ -22,6 +22,8 @@ import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 
+import junit.framework.TestCase;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -30,8 +32,6 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.VersionedProtocol;
 
-import junit.framework.TestCase;
-
 public class TestRPC extends TestCase {
   private static final int PORT = 1234;
   private static final String ADDRESS = "0.0.0.0";

Modified: hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java Thu Feb 21 06:38:33 2013
@@ -35,10 +35,13 @@ public class TestConfigurator extends Te
    */
   public void testPluginDirNotPresented() throws Exception {
     System.setProperty("hama.home.dir", System.getProperty("user.dir"));
-    Map<String, Task> tasks = Configurator.configure(new HamaConfiguration(), null);    
-    LOG.info("Plugins dir is not created, returned tasks should be null -> "+tasks);
-    assertNull("Tasks returned should be null because no plugins dir is created.", tasks);
+    Map<String, Task> tasks = Configurator.configure(new HamaConfiguration(),
+        null);
+    LOG.info("Plugins dir is not created, returned tasks should be null -> "
+        + tasks);
+    assertNull(
+        "Tasks returned should be null because no plugins dir is created.",
+        tasks);
   }
 
-  
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java Thu Feb 21 06:38:33 2013
@@ -23,76 +23,77 @@ import org.apache.commons.logging.LogFac
 import org.apache.hama.HamaCluster;
 
 /**
- * Test case for Phi accrual fail detector. 
+ * Test case for Phi accrual fail detector.
  */
 public class TestFD extends HamaCluster {
   public static final Log LOG = LogFactory.getLog(TestFD.class);
-//  final HamaConfiguration conf;
-//  final ScheduledExecutorService sched;
-//
-//  public TestFD() {
-//    this.conf = getConf();
-//    this.sched = Executors.newScheduledThreadPool(10);
-//  }
-//
-//  public void setUp() throws Exception { }
-//
-//   * Test Phi Accrual Fialure Detector.
-//  public void testCumulativeDistributedFunction() throws Exception {
-//    this.conf.setInt("bsp.monitor.fd.udp_port", 9765);
-//    UDPSupervisor server = new UDPSupervisor(this.conf);
-//    UDPSensor client = new UDPSensor(this.conf); 
-//    this.sched.schedule(server, 0, SECONDS);
-//    this.sched.schedule(client, 2, SECONDS);
-//    boolean flag = true;
-//    int count = 0;
-//    while(flag){
-//      count++;
-//      Thread.sleep(1000*3);
-//      double phi = server.suspicionLevel("localhost");
-//      if(LOG.isDebugEnabled())
-//        LOG.debug("Phi value:"+phi+" Double.isInfinite(phi):"+Double.isInfinite(phi));
-//      assertTrue("In normal case phi should not go infinity!", !Double.isInfinite(phi));
-//      if(10 < count){
-//        flag = false;
-//      }
-//    }
-//    client.shutdown();
-//    server.shutdown();
-//    LOG.info("Finished testing suspicion level.");
-//  }
-//
-//   * Test when sensor fails.
-//  public void testSensorFailure() throws Exception{
-//    this.conf.setInt("bsp.monitor.fd.udp_port", 2874);
-//    UDPSupervisor server = new UDPSupervisor(this.conf);
-//    UDPSensor client = new UDPSensor(this.conf); 
-//    this.sched.schedule(server, 0, SECONDS);
-//    this.sched.schedule(client, 2, SECONDS);
-//    int count = 0;
-//    boolean flag = true;
-//    while(flag){
-//      count++;
-//      double phi = server.suspicionLevel("localhost");
-//      Thread.sleep(1000*3);
-//      if(5 < count){
-//        client.shutdown(); 
-//        Thread.sleep(1000*4);
-//        phi = server.suspicionLevel("localhost");
-//        if(LOG.isDebugEnabled())
-//          LOG.debug("Phi value should go infinity:"+Double.isInfinite(phi));
-//        assertTrue("In normal case phi should not go infinity!", Double.isInfinite(phi));
-//      }
-//      if(10 < count){
-//        flag = false;
-//      }
-//    }
-//    server.shutdown();
-//    LOG.info("Finished testing client failure case.");
-//  }
-//  
-//  public void tearDown() throws Exception { 
-//    sched.shutdown();
-//  }
+  // final HamaConfiguration conf;
+  // final ScheduledExecutorService sched;
+  //
+  // public TestFD() {
+  // this.conf = getConf();
+  // this.sched = Executors.newScheduledThreadPool(10);
+  // }
+  //
+  // public void setUp() throws Exception { }
+  //
+  // * Test Phi Accrual Fialure Detector.
+  // public void testCumulativeDistributedFunction() throws Exception {
+  // this.conf.setInt("bsp.monitor.fd.udp_port", 9765);
+  // UDPSupervisor server = new UDPSupervisor(this.conf);
+  // UDPSensor client = new UDPSensor(this.conf);
+  // this.sched.schedule(server, 0, SECONDS);
+  // this.sched.schedule(client, 2, SECONDS);
+  // boolean flag = true;
+  // int count = 0;
+  // while(flag){
+  // count++;
+  // Thread.sleep(1000*3);
+  // double phi = server.suspicionLevel("localhost");
+  // if(LOG.isDebugEnabled())
+  // LOG.debug("Phi value:"+phi+" Double.isInfinite(phi):"+Double.isInfinite(phi));
+  // assertTrue("In normal case phi should not go infinity!",
+  // !Double.isInfinite(phi));
+  // if(10 < count){
+  // flag = false;
+  // }
+  // }
+  // client.shutdown();
+  // server.shutdown();
+  // LOG.info("Finished testing suspicion level.");
+  // }
+  //
+  // * Test when sensor fails.
+  // public void testSensorFailure() throws Exception{
+  // this.conf.setInt("bsp.monitor.fd.udp_port", 2874);
+  // UDPSupervisor server = new UDPSupervisor(this.conf);
+  // UDPSensor client = new UDPSensor(this.conf);
+  // this.sched.schedule(server, 0, SECONDS);
+  // this.sched.schedule(client, 2, SECONDS);
+  // int count = 0;
+  // boolean flag = true;
+  // while(flag){
+  // count++;
+  // double phi = server.suspicionLevel("localhost");
+  // Thread.sleep(1000*3);
+  // if(5 < count){
+  // client.shutdown();
+  // Thread.sleep(1000*4);
+  // phi = server.suspicionLevel("localhost");
+  // if(LOG.isDebugEnabled())
+  // LOG.debug("Phi value should go infinity:"+Double.isInfinite(phi));
+  // assertTrue("In normal case phi should not go infinity!",
+  // Double.isInfinite(phi));
+  // }
+  // if(10 < count){
+  // flag = false;
+  // }
+  // }
+  // server.shutdown();
+  // LOG.info("Finished testing client failure case.");
+  // }
+  //
+  // public void tearDown() throws Exception {
+  // sched.shutdown();
+  // }
 }
-

Modified: hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java Thu Feb 21 06:38:33 2013
@@ -24,7 +24,7 @@ import junit.framework.TestCase;
 
 public class TestBytes extends TestCase {
   public void testNullHashCode() {
-    byte [] b = null;
+    byte[] b = null;
     Exception ee = null;
     try {
       Bytes.hashCode(b);
@@ -35,16 +35,16 @@ public class TestBytes extends TestCase 
   }
 
   public void testSplit() throws Exception {
-    byte [] lowest = Bytes.toBytes("AAA");
-    byte [] middle = Bytes.toBytes("CCC");
-    byte [] highest = Bytes.toBytes("EEE");
-    byte [][] parts = Bytes.split(lowest, highest, 1);
+    byte[] lowest = Bytes.toBytes("AAA");
+    byte[] middle = Bytes.toBytes("CCC");
+    byte[] highest = Bytes.toBytes("EEE");
+    byte[][] parts = Bytes.split(lowest, highest, 1);
     for (int i = 0; i < parts.length; i++) {
       System.out.println(Bytes.toString(parts[i]));
     }
     assertEquals(3, parts.length);
     assertTrue(Bytes.equals(parts[1], middle));
-    // Now divide into three parts.  Change highest so split is even.
+    // Now divide into three parts. Change highest so split is even.
     highest = Bytes.toBytes("DDD");
     parts = Bytes.split(lowest, highest, 2);
     for (int i = 0; i < parts.length; i++) {
@@ -57,10 +57,10 @@ public class TestBytes extends TestCase 
 
   public void testSplit2() throws Exception {
     // More split tests.
-    byte [] lowest = Bytes.toBytes("http://A");
-    byte [] highest = Bytes.toBytes("http://z");
-    byte [] middle = Bytes.toBytes("http://]");
-    byte [][] parts = Bytes.split(lowest, highest, 1);
+    byte[] lowest = Bytes.toBytes("http://A");
+    byte[] highest = Bytes.toBytes("http://z");
+    byte[] middle = Bytes.toBytes("http://]");
+    byte[][] parts = Bytes.split(lowest, highest, 1);
     for (int i = 0; i < parts.length; i++) {
       System.out.println(Bytes.toString(parts[i]));
     }
@@ -69,61 +69,52 @@ public class TestBytes extends TestCase 
   }
 
   public void testToLong() throws Exception {
-    long [] longs = {-1l, 123l, 122232323232l};
+    long[] longs = { -1l, 123l, 122232323232l };
     for (int i = 0; i < longs.length; i++) {
-      byte [] b = Bytes.toBytes(longs[i]);
+      byte[] b = Bytes.toBytes(longs[i]);
       assertEquals(longs[i], Bytes.toLong(b));
     }
   }
 
   public void testToFloat() throws Exception {
-    float [] floats = {-1f, 123.123f, Float.MAX_VALUE};
+    float[] floats = { -1f, 123.123f, Float.MAX_VALUE };
     for (int i = 0; i < floats.length; i++) {
-      byte [] b = Bytes.toBytes(floats[i]);
+      byte[] b = Bytes.toBytes(floats[i]);
       assertEquals(floats[i], Bytes.toFloat(b));
     }
   }
 
   public void testToDouble() throws Exception {
-    double [] doubles = {Double.MIN_VALUE, Double.MAX_VALUE};
+    double[] doubles = { Double.MIN_VALUE, Double.MAX_VALUE };
     for (int i = 0; i < doubles.length; i++) {
-      byte [] b = Bytes.toBytes(doubles[i]);
+      byte[] b = Bytes.toBytes(doubles[i]);
       assertEquals(doubles[i], Bytes.toDouble(b));
     }
   }
 
   public void testBinarySearch() throws Exception {
-    byte [][] arr = {
-        {1},
-        {3},
-        {5},
-        {7},
-        {9},
-        {11},
-        {13},
-        {15},
-    };
-    byte [] key1 = {3,1};
-    byte [] key2 = {4,9};
-    byte [] key2_2 = {4};
-    byte [] key3 = {5,11};
-    
-    assertEquals(1, Bytes.binarySearch(arr, key1, 0, 1,
-      Bytes.BYTES_RAWCOMPARATOR));
-    assertEquals(0, Bytes.binarySearch(arr, key1, 1, 1,
-      Bytes.BYTES_RAWCOMPARATOR));
-    assertEquals(-(2+1), Arrays.binarySearch(arr, key2_2,
-      Bytes.BYTES_COMPARATOR));
-    assertEquals(-(2+1), Bytes.binarySearch(arr, key2, 0, 1,
-      Bytes.BYTES_RAWCOMPARATOR));
-    assertEquals(4, Bytes.binarySearch(arr, key2, 1, 1,
-      Bytes.BYTES_RAWCOMPARATOR));
-    assertEquals(2, Bytes.binarySearch(arr, key3, 0, 1,
-      Bytes.BYTES_RAWCOMPARATOR));
-    assertEquals(5, Bytes.binarySearch(arr, key3, 1, 1,
-      Bytes.BYTES_RAWCOMPARATOR));
+    byte[][] arr = { { 1 }, { 3 }, { 5 }, { 7 }, { 9 }, { 11 }, { 13 }, { 15 }, };
+    byte[] key1 = { 3, 1 };
+    byte[] key2 = { 4, 9 };
+    byte[] key2_2 = { 4 };
+    byte[] key3 = { 5, 11 };
+
+    assertEquals(1,
+        Bytes.binarySearch(arr, key1, 0, 1, Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(0,
+        Bytes.binarySearch(arr, key1, 1, 1, Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(-(2 + 1),
+        Arrays.binarySearch(arr, key2_2, Bytes.BYTES_COMPARATOR));
+    assertEquals(-(2 + 1),
+        Bytes.binarySearch(arr, key2, 0, 1, Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(4,
+        Bytes.binarySearch(arr, key2, 1, 1, Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(2,
+        Bytes.binarySearch(arr, key3, 0, 1, Bytes.BYTES_RAWCOMPARATOR));
+    assertEquals(5,
+        Bytes.binarySearch(arr, key3, 1, 1, Bytes.BYTES_RAWCOMPARATOR));
   }
-  
+
   public void testIncrementBytes() throws IOException {
 
     assertTrue(checkTestIncrementBytes(10, 1));
@@ -144,11 +135,11 @@ public class TestBytes extends TestCase 
     assertTrue(checkTestIncrementBytes(-12, -34565445));
     assertTrue(checkTestIncrementBytes(-1546543452, -34565445));
   }
-  
-  private static boolean checkTestIncrementBytes(long val, long amount) 
-  throws IOException {
+
+  private static boolean checkTestIncrementBytes(long val, long amount)
+      throws IOException {
     byte[] value = Bytes.toBytes(val);
-    byte [] testValue = {-1, -1, -1, -1, -1, -1, -1, -1};
+    byte[] testValue = { -1, -1, -1, -1, -1, -1, -1, -1 };
     if (value[0] > 0) {
       testValue = new byte[Bytes.SIZEOF_LONG];
     }

Modified: hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java Thu Feb 21 06:38:33 2013
@@ -19,10 +19,10 @@
  */
 package org.apache.hama.util;
 
-import org.apache.log4j.Logger;
-
 import junit.framework.TestCase;
 
+import org.apache.log4j.Logger;
+
 /**
  * Random variable generation test
  */

Modified: hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java Thu Feb 21 06:38:33 2013
@@ -17,11 +17,11 @@
  */
 package org.apache.hama.zookeeper;
 
+import junit.framework.TestCase;
+
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 
-import junit.framework.TestCase;
-
 public class TestZKTools extends TestCase {
 
   public void testZKProps() {

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java Thu Feb 21 06:38:33 2013
@@ -181,21 +181,6 @@ public final class BipartiteMatching {
 
     }
 
-    @Override
-    public Text createVertexIDObject() {
-      return new Text();
-    }
-
-    @Override
-    public NullWritable createEdgeCostObject() {
-      return NullWritable.get();
-    }
-
-    @Override
-    public TextPair createVertexValue() {
-      return new TextPair();
-    }
-
   }
 
   /**
@@ -236,9 +221,10 @@ public final class BipartiteMatching {
     System.exit(-1);
   }
 
-  public static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException{
+  public static GraphJob createJob(String[] args, HamaConfiguration conf)
+      throws IOException {
     GraphJob job = new GraphJob(conf, BipartiteMatching.class);
-    
+
     // set the defaults
     job.setMaxIteration(30);
     job.setNumBspTask(2);
@@ -268,8 +254,7 @@ public final class BipartiteMatching {
     job.setOutputValueClass(TextPair.class);
     return job;
   }
-  
-  
+
   public static void main(String... args) throws IOException,
       InterruptedException, ClassNotFoundException {
 
@@ -278,7 +263,7 @@ public final class BipartiteMatching {
     }
 
     HamaConfiguration conf = new HamaConfiguration(new Configuration());
-    
+
     GraphJob job = createJob(args, conf);
 
     long startTime = System.currentTimeMillis();

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Thu Feb 21 06:38:33 2013
@@ -38,7 +38,7 @@ public class ExampleDriver {
       pgd.addClass("bipartite", BipartiteMatching.class, "Bipartite Matching");
       pgd.addClass("kmeans", Kmeans.class, "K-Means Clustering");
       pgd.addClass("gd", GradientDescentExample.class, "Gradient Descent");
-      
+
       pgd.addClass("gen", Generator.class, "Random Data Generator Util");
       pgd.driver(args);
     } catch (Throwable e) {

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/GradientDescentExample.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/GradientDescentExample.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/GradientDescentExample.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/GradientDescentExample.java Thu Feb 21 06:38:33 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.examples;
 
+import java.io.IOException;
+
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,8 +35,6 @@ import org.apache.hama.ml.regression.Reg
 import org.apache.hama.ml.regression.VectorDoubleFileInputFormat;
 import org.apache.hama.ml.writable.VectorWritable;
 
-import java.io.IOException;
-
 /**
  * A {@link GradientDescentBSP} job example
  */
@@ -44,10 +44,10 @@ public class GradientDescentExample {
   public static void main(String[] args) throws InterruptedException,
       IOException, ClassNotFoundException {
 
-   if (!(args.length == 1 || args.length == 2)) {
-     System.out.println("USAGE: <INPUT_PATH> [<REGRESSION_MODEL>]");
-     return;
-   }
+    if (!(args.length == 1 || args.length == 2)) {
+      System.out.println("USAGE: <INPUT_PATH> [<REGRESSION_MODEL>]");
+      return;
+    }
 
     // BSP job configuration
     HamaConfiguration conf = new HamaConfiguration();
@@ -58,14 +58,13 @@ public class GradientDescentExample {
     if (args.length == 2 && args[1] != null) {
       if (args[1].equals("logistic")) {
         conf.setClass(GradientDescentBSP.REGRESSION_MODEL_CLASS,
-          LogisticRegressionModel.class, RegressionModel.class);
-      }
-      else if (args[1].equals("linear")) {
+            LogisticRegressionModel.class, RegressionModel.class);
+      } else if (args[1].equals("linear")) {
         // do nothing as 'linear' is default
-      }
-      else {
-        throw new RuntimeException(new StringBuilder("unsupported RegressionModel").
-                append(args[1]).append(", use 'logistic' or 'linear'").toString());
+      } else {
+        throw new RuntimeException(new StringBuilder(
+            "unsupported RegressionModel").append(args[1])
+            .append(", use 'logistic' or 'linear'").toString());
       }
     }
 

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Thu Feb 21 06:38:33 2013
@@ -17,8 +17,6 @@
  */
 package org.apache.hama.examples;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -28,12 +26,14 @@ import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.bsp.TextArrayWritable;
+import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
 
+import com.google.common.base.Optional;
+
 public class InlinkCount extends Vertex<Text, NullWritable, IntWritable> {
 
   @Override
@@ -56,28 +56,24 @@ public class InlinkCount extends Vertex<
     System.exit(-1);
   }
 
-  public static void main(String[] args) throws IOException,
-      InterruptedException, ClassNotFoundException {
-
-    if (args.length < 2)
-      printUsage();
-
+  public static GraphJob getJob(String inpath, String outpath,
+      Optional<Integer> numBspTasks) throws IOException {
     // Graph job configuration
     HamaConfiguration conf = new HamaConfiguration();
     GraphJob inlinkJob = new GraphJob(conf, InlinkCount.class);
     // Set the job name
     inlinkJob.setJobName("Inlink Count");
 
-    inlinkJob.setInputPath(new Path(args[0]));
-    inlinkJob.setOutputPath(new Path(args[1]));
+    inlinkJob.setInputPath(new Path(inpath));
+    inlinkJob.setOutputPath(new Path(outpath));
 
-    if (args.length == 3) {
-      inlinkJob.setNumBspTask(Integer.parseInt(args[2]));
+    if (numBspTasks.isPresent()) {
+      inlinkJob.setNumBspTask(numBspTasks.get());
     }
 
     inlinkJob.setVertexClass(InlinkCount.class);
 
-    inlinkJob.setInputFormat(SequenceFileInputFormat.class);
+    inlinkJob.setInputFormat(TextInputFormat.class);
     inlinkJob.setInputKeyClass(Text.class);
     inlinkJob.setInputValueClass(TextArrayWritable.class);
 
@@ -89,6 +85,18 @@ public class InlinkCount extends Vertex<
     inlinkJob.setOutputFormat(SequenceFileOutputFormat.class);
     inlinkJob.setOutputKeyClass(Text.class);
     inlinkJob.setOutputValueClass(IntWritable.class);
+    return inlinkJob;
+  }
+
+  public static void main(String[] args) throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    if (args.length < 2)
+      printUsage();
+
+    Optional<Integer> absent = Optional.absent();
+    GraphJob inlinkJob = getJob(args[0], args[1],
+        args.length >= 3 ? Optional.of(Integer.parseInt(args[3])) : absent);
 
     long startTime = System.currentTimeMillis();
     if (inlinkJob.waitForCompletion(true)) {
@@ -96,27 +104,4 @@ public class InlinkCount extends Vertex<
           + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
   }
-
-  @Override
-  public void readState(DataInput in) throws IOException {}
-
-  @Override
-  public void writeState(DataOutput out) throws IOException {}
-
-  @Override
-  public Text createVertexIDObject() {
-    return new Text();
-  }
-
-  @Override
-  public NullWritable createEdgeCostObject() {
-    return NullWritable.get();
-  }
-
-  @Override
-  public IntWritable createVertexValue() {
-    return new IntWritable();
-  }
-
-  
 }

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/Kmeans.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/Kmeans.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/Kmeans.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/Kmeans.java Thu Feb 21 06:38:33 2013
@@ -76,7 +76,7 @@ public class Kmeans {
     int k = Integer.parseInt(args[3]);
     if (args.length == 7 && args[4].equals("-g")) {
       int count = Integer.parseInt(args[5]);
-      if(k > count)
+      if (k > count)
         throw new IllegalArgumentException("K can't be greater than n!");
       int dimension = Integer.parseInt(args[6]);
       System.out.println("N: " + count + " Dimension: " + dimension

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Thu Feb 21 06:38:33 2013
@@ -17,24 +17,25 @@
  */
 package org.apache.hama.examples;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
-import org.apache.hama.bsp.TextArrayWritable;
+import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.bsp.TextOutputFormat;
 import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+
+import com.google.common.base.Optional;
 
 /**
  * Finding the mindist vertex in a connected component.
@@ -82,28 +83,26 @@ public class MindistSearch {
       }
     }
 
-    @Override
-    public void readState(DataInput in) throws IOException {}
-
-    @Override
-    public void writeState(DataOutput out) throws IOException {}
-
-    @Override
-    public Text createVertexIDObject() {
-      return new Text();
-    }
+  }
 
-    @Override
-    public NullWritable createEdgeCostObject() {
-      return NullWritable.get();
-    }
+  public static class MindistSearchTextReader extends
+      VertexInputReader<LongWritable, Text, Text, NullWritable, Text> {
 
     @Override
-    public Text createVertexValue() {
-      return new Text();
+    public boolean parseVertex(LongWritable key, Text value,
+        Vertex<Text, NullWritable, Text> vertex) throws Exception {
+      String[] split = value.toString().split("\t");
+      for (int i = 0; i < split.length; i++) {
+        if (i == 0) {
+          vertex.setVertexID(new Text(split[i]));
+        } else {
+          vertex
+              .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
+        }
+      }
+      return true;
     }
 
-    
   }
 
   public static class MinTextCombiner extends Combiner<Text> {
@@ -127,40 +126,52 @@ public class MindistSearch {
     System.exit(-1);
   }
 
-  public static void main(String[] args) throws IOException,
-      InterruptedException, ClassNotFoundException {
-    if (args.length < 2)
-      printUsage();
+  public static GraphJob getJob(String inpath, String outpath,
+      Optional<Integer> numTasks, Optional<Integer> numIterations)
+      throws IOException {
 
     HamaConfiguration conf = new HamaConfiguration(new Configuration());
     GraphJob job = new GraphJob(conf, MindistSearchVertex.class);
     job.setJobName("Mindist Search");
 
     job.setVertexClass(MindistSearchVertex.class);
-    job.setInputPath(new Path(args[0]));
-    job.setOutputPath(new Path(args[1]));
+    job.setInputPath(new Path(inpath));
+    job.setOutputPath(new Path(outpath));
     // set the min text combiner here
     job.setCombinerClass(MinTextCombiner.class);
 
     // set the defaults
     job.setMaxIteration(30);
-    if (args.length == 4)
-      job.setNumBspTask(Integer.parseInt(args[3]));
-    if (args.length >= 3)
-      job.setMaxIteration(Integer.parseInt(args[2]));
+    if (numTasks.isPresent())
+      job.setNumBspTask(numTasks.get());
+    if (numIterations.isPresent())
+      job.setMaxIteration(numIterations.get());
 
     job.setVertexIDClass(Text.class);
     job.setVertexValueClass(Text.class);
     job.setEdgeValueClass(NullWritable.class);
 
-    job.setInputFormat(SequenceFileInputFormat.class);
-    job.setInputKeyClass(Text.class);
-    job.setInputValueClass(TextArrayWritable.class);
+    job.setInputFormat(TextInputFormat.class);
+    job.setInputKeyClass(LongWritable.class);
+    job.setInputValueClass(Text.class);
+    job.setVertexInputReaderClass(MindistSearchTextReader.class);
 
     job.setPartitioner(HashPartitioner.class);
     job.setOutputFormat(TextOutputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
+    return job;
+  }
+
+  public static void main(String[] args) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    if (args.length < 2)
+      printUsage();
+
+    Optional<Integer> absent = Optional.absent();
+    GraphJob job = getJob(args[0], args[1],
+        args.length >= 3 ? Optional.of(Integer.parseInt(args[3])) : absent,
+        args.length >= 4 ? Optional.of(Integer.parseInt(args[4])) : absent);
 
     long startTime = System.currentTimeMillis();
     if (job.waitForCompletion(true)) {
@@ -168,5 +179,4 @@ public class MindistSearch {
           + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
   }
-
 }



Mime
View raw message