hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1448523 [2/4] - in /hama/trunk: ./ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/ft/ core/src/main/java/org/apache/hama/bsp/message/compress/ core/src/main/java/org/apach...
Date Thu, 21 Feb 2013 06:38:36 GMT
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java Thu Feb 21 06:38:33 2013
@@ -131,10 +131,9 @@ public class AsyncRcvdMsgCheckpointImpl<
 
       Map<TaskID, TaskInProgress> recoverySet = new HashMap<TaskID, TaskInProgress>(
           2 * failedTasksInProgress.length);
-        for (TaskInProgress failedTasksInProgres : failedTasksInProgress) {
-            recoverySet.put(failedTasksInProgres.getTaskId(),
-                    failedTasksInProgres);
-        }
+      for (TaskInProgress failedTasksInProgres : failedTasksInProgress) {
+        recoverySet.put(failedTasksInProgres.getTaskId(), failedTasksInProgres);
+      }
 
       long lowestSuperstepNumber = Long.MAX_VALUE;
 
@@ -152,31 +151,31 @@ public class AsyncRcvdMsgCheckpointImpl<
       }
 
       if (taskProgress.length == this.tasks.length) {
-          for (String taskProgres : taskProgress) {
-              ArrayWritable progressInformation = new ArrayWritable(
-                      LongWritable.class);
-              boolean result = this.masterSyncClient.getInformation(
-                      this.masterSyncClient.constructKey(jobId, "checkpoint",
-                              taskProgres), progressInformation);
-
-              if (!result) {
-                  lowestSuperstepNumber = -1L;
-                  break;
-              }
+        for (String taskProgres : taskProgress) {
+          ArrayWritable progressInformation = new ArrayWritable(
+              LongWritable.class);
+          boolean result = this.masterSyncClient.getInformation(
+              this.masterSyncClient.constructKey(jobId, "checkpoint",
+                  taskProgres), progressInformation);
+
+          if (!result) {
+            lowestSuperstepNumber = -1L;
+            break;
+          }
 
-              Writable[] progressArr = progressInformation.get();
-              LongWritable superstepProgress = (LongWritable) progressArr[0];
+          Writable[] progressArr = progressInformation.get();
+          LongWritable superstepProgress = (LongWritable) progressArr[0];
 
-              if (superstepProgress != null) {
-                  if (superstepProgress.get() < lowestSuperstepNumber) {
-                      lowestSuperstepNumber = superstepProgress.get();
-                      if (LOG.isDebugEnabled()) {
-                          LOG.debug("Got superstep number " + lowestSuperstepNumber
-                                  + " from " + taskProgres);
-                      }
-                  }
+          if (superstepProgress != null) {
+            if (superstepProgress.get() < lowestSuperstepNumber) {
+              lowestSuperstepNumber = superstepProgress.get();
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Got superstep number " + lowestSuperstepNumber
+                    + " from " + taskProgres);
               }
+            }
           }
+        }
         clearClientForSuperstep(lowestSuperstepNumber);
         restartJob(lowestSuperstepNumber, groomStatuses, recoverySet,
             allTasksInProgress, taskCountInGroomMap, actionMap);
@@ -225,56 +224,55 @@ public class AsyncRcvdMsgCheckpointImpl<
 
       if (superstep >= 0) {
         FileSystem fileSystem = FileSystem.get(conf);
-          for (TaskInProgress allTask : allTasks) {
-              String[] hosts = null;
-              if (recoveryMap.containsKey(allTask.getTaskId())) {
-
-                  // Update task count in map.
-                  // TODO: This should be a responsibility of GroomServerStatus
-                  Integer count = taskCountInGroomMap.get(allTask
-                          .getGroomServerStatus());
-                  if (count != null) {
-                      count = count.intValue() - 1;
-                      taskCountInGroomMap
-                              .put(allTask.getGroomServerStatus(), count);
-                  }
-
-                  StringBuffer ckptPath = new StringBuffer(path);
-                  ckptPath.append(this.jobId.toString());
-                  ckptPath.append("/").append(superstep).append("/")
-                          .append(allTask.getTaskId().getId());
-                  Path checkpointPath = new Path(ckptPath.toString());
-                  if (fileSystem.exists(checkpointPath)) {
-                      FileStatus fileStatus = fileSystem.getFileStatus(checkpointPath);
-                      BlockLocation[] blocks = fileSystem.getFileBlockLocations(
-                              fileStatus, 0, fileStatus.getLen());
-                      hosts = blocks[0].getHosts();
-                  } else {
-                      hosts = new String[groomStatuses.keySet().size()];
-                      groomStatuses.keySet().toArray(hosts);
-                  }
-                  GroomServerStatus serverStatus = this.allocationStrategy
-                          .getGroomToAllocate(groomStatuses, hosts, taskCountInGroomMap,
-                                  new BSPResource[0], allTask);
-                  Task task = allTask.constructTask(serverStatus);
-                  populateAction(task, superstep, serverStatus, actionMap);
+        for (TaskInProgress allTask : allTasks) {
+          String[] hosts = null;
+          if (recoveryMap.containsKey(allTask.getTaskId())) {
+
+            // Update task count in map.
+            // TODO: This should be a responsibility of GroomServerStatus
+            Integer count = taskCountInGroomMap.get(allTask
+                .getGroomServerStatus());
+            if (count != null) {
+              count = count.intValue() - 1;
+              taskCountInGroomMap.put(allTask.getGroomServerStatus(), count);
+            }
 
-              } else {
-                  restartTask(allTask, superstep, groomStatuses, actionMap);
-              }
+            StringBuffer ckptPath = new StringBuffer(path);
+            ckptPath.append(this.jobId.toString());
+            ckptPath.append("/").append(superstep).append("/")
+                .append(allTask.getTaskId().getId());
+            Path checkpointPath = new Path(ckptPath.toString());
+            if (fileSystem.exists(checkpointPath)) {
+              FileStatus fileStatus = fileSystem.getFileStatus(checkpointPath);
+              BlockLocation[] blocks = fileSystem.getFileBlockLocations(
+                  fileStatus, 0, fileStatus.getLen());
+              hosts = blocks[0].getHosts();
+            } else {
+              hosts = new String[groomStatuses.keySet().size()];
+              groomStatuses.keySet().toArray(hosts);
+            }
+            GroomServerStatus serverStatus = this.allocationStrategy
+                .getGroomToAllocate(groomStatuses, hosts, taskCountInGroomMap,
+                    new BSPResource[0], allTask);
+            Task task = allTask.constructTask(serverStatus);
+            populateAction(task, superstep, serverStatus, actionMap);
+
+          } else {
+            restartTask(allTask, superstep, groomStatuses, actionMap);
           }
+        }
       } else {
         // Start the task from the beginning.
-          for (TaskInProgress allTask : allTasks) {
-              if (recoveryMap.containsKey(allTask.getTaskId())) {
-                  this.allocationStrategy.getGroomToAllocate(groomStatuses,
-                          this.allocationStrategy.selectGrooms(groomStatuses,
-                                  taskCountInGroomMap, new BSPResource[0], allTask),
-                          taskCountInGroomMap, new BSPResource[0], allTask);
-              } else {
-                  restartTask(allTask, superstep, groomStatuses, actionMap);
-              }
+        for (TaskInProgress allTask : allTasks) {
+          if (recoveryMap.containsKey(allTask.getTaskId())) {
+            this.allocationStrategy.getGroomToAllocate(groomStatuses,
+                this.allocationStrategy.selectGrooms(groomStatuses,
+                    taskCountInGroomMap, new BSPResource[0], allTask),
+                taskCountInGroomMap, new BSPResource[0], allTask);
+          } else {
+            restartTask(allTask, superstep, groomStatuses, actionMap);
           }
+        }
       }
     }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java Thu Feb 21 06:38:33 2013
@@ -35,15 +35,15 @@ import org.apache.hama.bsp.taskallocatio
  * <code>BSPFaultTolerantService</code> defines the fault tolerance service
  * behavior. The fault tolerance service is a feature of a running job and not
  * the system. A class defined on this behavior has the responsibility to create
- * two objects. The first object <code>FaultTolerantMasterService</code> is
- * used by the job at BSPMaster to handle fault tolerance related steps at the
+ * two objects. The first object <code>FaultTolerantMasterService</code> is used
+ * by the job at BSPMaster to handle fault tolerance related steps at the
  * master. The second object <code>FaultTolerantPeerService</code> is used to
  * define the behavior of object that would implement the fault tolerance
  * related steps for recovery inside <code>BSPPeer</code> (in each of the BSP
  * peers doing computations)
  */
 public interface BSPFaultTolerantService<M extends Writable> {
-  
+
   /**
    * The token by which a job can register its fault-tolerance service.
    */

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java Thu Feb 21 06:38:33 2013
@@ -27,14 +27,14 @@ import org.apache.hadoop.io.Writable;
  * A compressed representation of BSPMessageBundle.
  * 
  */
-public final class BSPCompressedBundle implements Writable{
+public final class BSPCompressedBundle implements Writable {
 
   private byte[] data;
 
-  public BSPCompressedBundle(){		
+  public BSPCompressedBundle() {
   }
 
-  public BSPCompressedBundle(byte[] data){
+  public BSPCompressedBundle(byte[] data) {
     this.data = data;
   }
 
@@ -53,9 +53,9 @@ public final class BSPCompressedBundle i
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException {		
+  public void readFields(DataInput in) throws IOException {
     int len = in.readInt();
-    data    = new byte[len];
+    data = new byte[len];
     in.readFully(data);
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java Thu Feb 21 06:38:33 2013
@@ -46,5 +46,6 @@ public abstract class BSPMessageCompress
    * @param compMsgBundle
    * @return
    */
-  public abstract BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle);
+  public abstract BSPMessageBundle<M> decompressBundle(
+      BSPCompressedBundle compMsgBundle);
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java Thu Feb 21 06:38:33 2013
@@ -18,8 +18,8 @@
 package org.apache.hama.bsp.message.io;
 
 /**
- * The implementation of shared object when there is no spilling in the
- * first place.
+ * The implementation of shared object when there is no spilling in the first
+ * place.
  * 
  */
 class BufferReadStatus extends ReadIndexStatus {
@@ -34,7 +34,7 @@ class BufferReadStatus extends ReadIndex
 
   @Override
   public int getReadBufferIndex() {
-    if(count == index - 1){
+    if (count == index - 1) {
       return -1;
     }
     return ++index;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java Thu Feb 21 06:38:33 2013
@@ -135,7 +135,7 @@ public class ByteBufferInputStream exten
    * @param len length of data to be written
    * @param cur The current size already read by the class.
    * @return if the end of the stream has reached, and cur is 0 return -1; else
-   * return the data size currently read.
+   *         return the data size currently read.
    * @throws IOException
    */
   protected int onBufferRead(byte[] b, int off, int len, int cur)
@@ -153,6 +153,7 @@ public class ByteBufferInputStream exten
 
   /**
    * Sets the byte buffer to read the data from.
+   * 
    * @param buffer The byte buffer to read data from
    * @param toRead Number of bytes till end of last record.
    * @param total Total bytes of data to read in the buffer.
@@ -183,10 +184,11 @@ public class ByteBufferInputStream exten
 
   /**
    * This function should be called to provision reading the partial records
-   * into the buffer after the last record in the buffer is read. This data 
-   * would be appended with the next ByteBuffer that is set using 
+   * into the buffer after the last record in the buffer is read. This data
+   * would be appended with the next ByteBuffer that is set using
    * {@link ByteBufferInputStream#setBuffer(ByteBuffer, long, long)} to start
    * reading records.
+   * 
    * @throws IOException
    */
   public void fillForNext() throws IOException {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java Thu Feb 21 06:38:33 2013
@@ -23,9 +23,9 @@ import java.nio.ByteBuffer;
 
 /**
  * ByteBufferOutputStream encapsulates a byte buffer to write data into. The
- * function {@link ByteBufferOutputStream#onBufferFull(byte[], int, int)} 
- * should be overriden to handle the case when the size of data exceeds the size
- * of buffer. The default behavior is to throw an exception.
+ * function {@link ByteBufferOutputStream#onBufferFull(byte[], int, int)} should
+ * be overriden to handle the case when the size of data exceeds the size of
+ * buffer. The default behavior is to throw an exception.
  */
 class ByteBufferOutputStream extends OutputStream {
 
@@ -43,16 +43,17 @@ class ByteBufferOutputStream extends Out
 
   /**
    * Sets the buffer for the stream.
+   * 
    * @param buffer byte buffer to hold within.
    */
   public void setBuffer(ByteBuffer buffer) {
     this.buffer = buffer;
     this.interBufferDataSize = 0;
-    int interSize = Math.min(buffer.capacity()/2, 8192);
-    if(interBuffer == null){
+    int interSize = Math.min(buffer.capacity() / 2, 8192);
+    if (interBuffer == null) {
       interBuffer = new byte[interSize];
     }
-    
+
   }
 
   @Override
@@ -98,7 +99,7 @@ class ByteBufferOutputStream extends Out
   }
 
   /**
-   * Action to take when the data to be written exceeds the size of the byte 
+   * Action to take when the data to be written exceeds the size of the byte
    * buffer inside.
    * 
    * @return
@@ -116,6 +117,7 @@ class ByteBufferOutputStream extends Out
 
   /**
    * Called when the byte buffer stream is closed.
+   * 
    * @throws IOException
    */
   protected void onFlush() throws IOException {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java Thu Feb 21 06:38:33 2013
@@ -119,7 +119,7 @@ public class CombineSpilledDataProcessor
     try {
       return super.handleSpilledBuffer(new SpilledByteBuffer(
           this.combineOutputBuffer.getBuffer(), this.combineOutputBuffer
-          .getBuffer().remaining()));
+              .getBuffer().remaining()));
     } finally {
       this.combineOutputBuffer.clear();
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java Thu Feb 21 06:38:33 2013
@@ -22,30 +22,30 @@ import java.nio.ByteBuffer;
 
 /**
  * Encapsulates a {@link DirectByteBufferOutputStream}.
- *
+ * 
  */
 public class DirectByteBufferOutputStream extends DataOutputStream {
 
   public DirectByteBufferOutputStream() {
     super(new ByteBufferOutputStream());
   }
-  
-  public DirectByteBufferOutputStream(ByteBufferOutputStream stream){
+
+  public DirectByteBufferOutputStream(ByteBufferOutputStream stream) {
     super(stream);
   }
 
   public ByteBuffer getBuffer() {
-    ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out;
+    ByteBufferOutputStream stream = (ByteBufferOutputStream) this.out;
     return stream.getBuffer();
   }
 
   public void setBuffer(ByteBuffer buffer) {
-    ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out;
+    ByteBufferOutputStream stream = (ByteBufferOutputStream) this.out;
     stream.setBuffer(buffer);
   }
 
-  public void clear(){
-    ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out;
+  public void clear() {
+    ByteBufferOutputStream stream = (ByteBufferOutputStream) this.out;
     stream.clear();
   }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java Thu Feb 21 06:38:33 2013
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hama.Constants;
 
 /**
- * A synchronous i/o stream that is used to write data into and to read back the 
+ * A synchronous i/o stream that is used to write data into and to read back the
  * written data.
  */
 public class DualChannelByteBufferStream {
@@ -73,8 +73,8 @@ public class DualChannelByteBufferStream
     }
     outputMode = false;
   }
-  
-  public void close() throws IOException{
+
+  public void close() throws IOException {
     closeInput();
     closeOutput();
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java Thu Feb 21 06:38:33 2013
@@ -1,4 +1,3 @@
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -24,19 +23,19 @@ import java.nio.channels.ReadableByteCha
 import java.nio.channels.WritableByteChannel;
 
 /**
- * A utility class to just hold the byte buffer to read back the written data
- * or write data to read back. Buffer overflow, underflow conditions are not
+ * A utility class to just hold the byte buffer to read back the written data or
+ * write data to read back. Buffer overflow, underflow conditions are not
  * enforced.
- *
+ * 
  */
 public class DuplexByteArrayChannel implements WritableByteChannel,
     ReadableByteChannel {
 
   private boolean open;
   private ByteBuffer buffer;
-  
-  DuplexByteArrayChannel(){
-    
+
+  DuplexByteArrayChannel() {
+
   }
 
   @Override
@@ -71,8 +70,8 @@ public class DuplexByteArrayChannel impl
   public void flip() {
     buffer.flip();
   }
-  
-  public ByteBuffer getBuffer(){
+
+  public ByteBuffer getBuffer() {
     return buffer;
   }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java Thu Feb 21 06:38:33 2013
@@ -28,9 +28,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hama.bsp.message.io.PreFetchCache;
-import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
-import org.apache.hama.bsp.message.io.SpilledDataReadStatus;
 
 public class PreFetchCache<M extends Writable> {
   private static final Log LOG = LogFactory.getLog(PreFetchCache.class);
@@ -147,7 +144,7 @@ public class PreFetchCache<M extends Wri
     preFetchThread = new PreFetchThread<M>(classObject, objectListArr,
         capacity, buffer, totalMessages, status, conf);
     preFetchThread.start();
-    if(!status.startReading()){
+    if (!status.startReading()) {
       throw new IOException("Failed to start reading the spilled file: ");
     }
     arrIndex = status.getReadBufferIndex();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java Thu Feb 21 06:38:33 2013
@@ -18,15 +18,15 @@
 package org.apache.hama.bsp.message.io;
 
 /**
- * The base class that defines the shared object that synchronizes the
- * indexes for the byte array such that there is no loss of data.
+ * The base class that defines the shared object that synchronizes the indexes
+ * for the byte array such that there is no loss of data.
  * 
  */
 abstract class ReadIndexStatus {
 
   /**
-   * Returns index of the byte array that is ready for the data to be read
-   * from. The implementation may or may not block depending on the spilling
+   * Returns index of the byte array that is ready for the data to be read from.
+   * The implementation may or may not block depending on the spilling
    * situation.
    * 
    * @return the index.
@@ -35,9 +35,9 @@ abstract class ReadIndexStatus {
   public abstract int getReadBufferIndex() throws InterruptedException;
 
   /**
-   * Returns the index of the byte array that could used to load the data
-   * from the spilled file. The implementation may or may not block
-   * depending on the spilling situation.
+   * Returns the index of the byte array that could used to load the data from
+   * the spilled file. The implementation may or may not block depending on the
+   * spilling situation.
    * 
    * @return
    * @throws InterruptedException
@@ -55,4 +55,3 @@ abstract class ReadIndexStatus {
   public abstract boolean startReading();
 
 }
-

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java Thu Feb 21 06:38:33 2013
@@ -49,8 +49,8 @@ public class SpilledByteBuffer {
   public void setRecordClass(Class<? extends Writable> classObj) {
     this.writableClass = classObj;
   }
-  
-  public Class<? extends Writable> getRecordClass(){
+
+  public Class<? extends Writable> getRecordClass() {
     return this.writableClass;
   }
 
@@ -66,14 +66,14 @@ public class SpilledByteBuffer {
   public void markEndOfRecord() {
     this.endOfRecord = this.buffer.position();
   }
-  
-  public void markEndOfRecord(int pos){
-    if(pos < this.buffer.capacity()){
+
+  public void markEndOfRecord(int pos) {
+    if (pos < this.buffer.capacity()) {
       this.endOfRecord = pos;
     }
   }
-  
-  public int getMarkofLastRecord(){
+
+  public int getMarkofLastRecord() {
     return this.endOfRecord;
   }
 
@@ -290,8 +290,8 @@ public class SpilledByteBuffer {
     buffer.put(byteBuffer);
 
   }
-  
-  public int capacity(){
+
+  public int capacity() {
     return this.buffer.capacity();
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java Thu Feb 21 06:38:33 2013
@@ -23,14 +23,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * The implementation of the shared object when the buffer has already
- * spilled to disk.
+ * The implementation of the shared object when the buffer has already spilled
+ * to disk.
  * 
  */
 class SpilledDataReadStatus extends ReadIndexStatus {
 
   private static final Log LOG = LogFactory.getLog(SpilledDataReadStatus.class);
-  
+
   private volatile int readBufferIndex_;
   private volatile int fetchFileBufferIndex_;
   private int totalSize_;
@@ -63,7 +63,7 @@ class SpilledDataReadStatus extends Read
     errorState_ = true;
     notify();
   }
-  
+
   @Override
   public synchronized int getReadBufferIndex() throws InterruptedException {
 
@@ -74,7 +74,8 @@ class SpilledDataReadStatus extends Read
       notify();
     }
     readBufferIndex_ = (readBufferIndex_ + 1) % totalSize_;
-    while (!bufferBitState_.get(readBufferIndex_) && !fileReadComplete_ && !errorState_) {
+    while (!bufferBitState_.get(readBufferIndex_) && !fileReadComplete_
+        && !errorState_) {
       wait();
     }
     // The file is completely read and transferred to buffers already.
@@ -105,8 +106,8 @@ class SpilledDataReadStatus extends Read
     notify();
     fetchFileBufferIndex_ = (fetchFileBufferIndex_ + 1) % totalSize_;
 
-    while (bufferBitState_.get(fetchFileBufferIndex_)
-        && !bufferReadComplete_ && !errorState_) {
+    while (bufferBitState_.get(fetchFileBufferIndex_) && !bufferReadComplete_
+        && !errorState_) {
       wait();
     }
 
@@ -126,8 +127,8 @@ class SpilledDataReadStatus extends Read
   }
 
   /**
-   * Called by the thread to indicate that all the spilled data is
-   * completely read.
+   * Called by the thread to indicate that all the spilled data is completely
+   * read.
    */
   public synchronized void closedBySpiller() {
     fileReadComplete_ = true;
@@ -149,4 +150,3 @@ class SpilledDataReadStatus extends Read
   }
 
 }
-

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java Thu Feb 21 06:38:33 2013
@@ -256,9 +256,9 @@ public class SpillingDataOutputBuffer ex
       startedSpilling_ = false;
       bufferState_.clear();
 
-        for (SpilledByteBuffer aBufferList_ : bufferList_) {
-            aBufferList_.clear();
-        }
+      for (SpilledByteBuffer aBufferList_ : bufferList_) {
+        aBufferList_.clear();
+      }
       currentBuffer_ = bufferList_.get(0);
       bytesWritten_ = 0L;
       bytesRemaining_ = bufferSize_;
@@ -398,7 +398,7 @@ public class SpillingDataOutputBuffer ex
      * @throws IOException
      */
     SpilledByteBuffer getBuffer(int index) throws IOException {
-      if(index < 0){
+      if (index < 0) {
         return null;
       }
       if (index >= bufferList_.size()) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java Thu Feb 21 06:38:33 2013
@@ -22,10 +22,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+
 /**
- * A {@link ByteBuffer} stream that synchronously writes the spilled data to 
+ * A {@link ByteBuffer} stream that synchronously writes the spilled data to
  * local storage.
- *
+ * 
  */
 public class SyncFlushByteBufferOutputStream extends ByteBufferOutputStream {
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java Thu Feb 21 06:38:33 2013
@@ -25,6 +25,7 @@ import java.nio.channels.FileChannel;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 /**
  * A {@link ByteBuffer} input stream that synchronously reads from spilled data.
  * Uses {@link DuplexByteArrayChannel} within.

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java Thu Feb 21 06:38:33 2013
@@ -37,7 +37,7 @@ public class WriteSpilledDataProcessor i
 
   private FileChannel fileChannel;
   private String fileName;
-  
+
   public WriteSpilledDataProcessor(String fileName)
       throws FileNotFoundException {
     this.fileName = fileName;
@@ -63,11 +63,11 @@ public class WriteSpilledDataProcessor i
   @Override
   public boolean handleSpilledBuffer(SpilledByteBuffer buffer) {
     try {
-      
-      if(fileChannel == null){
+
+      if (fileChannel == null) {
         initializeFileChannel();
       }
-      
+
       fileChannel.write(buffer.getByteBuffer());
       fileChannel.force(true);
       return true;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Thu Feb 21 06:38:33 2013
@@ -46,7 +46,8 @@ import org.apache.hama.bsp.TaskAttemptID
  * configuration. <br/>
  * <b>It is experimental to use.</b>
  */
-public final class DiskQueue<M extends Writable> implements MessageQueue<M>, MessageTransferQueue<M> {
+public final class DiskQueue<M extends Writable> implements MessageQueue<M>,
+    MessageTransferQueue<M> {
 
   public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Thu Feb 21 06:38:33 2013
@@ -29,7 +29,8 @@ import org.apache.hama.bsp.TaskAttemptID
 /**
  * LinkedList backed queue structure for bookkeeping messages.
  */
-public final class MemoryQueue<M extends Writable> implements MessageQueue<M>, MessageTransferQueue<M> {
+public final class MemoryQueue<M extends Writable> implements MessageQueue<M>,
+    MessageTransferQueue<M> {
 
   private final Deque<M> deque = new ArrayDeque<M>();
   private Configuration conf;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Thu Feb 21 06:38:33 2013
@@ -79,7 +79,7 @@ public interface MessageQueue<M> extends
    * @return how many items are in the queue.
    */
   public int size();
-  
+
   /**
    * 
    * @return true if the messages in the queue are serialized to byte buffers.

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java Thu Feb 21 06:38:33 2013
@@ -19,16 +19,16 @@ package org.apache.hama.bsp.message.queu
 
 /**
  * 
- *
+ * 
  * @param <M>
  */
 public interface MessageTransferQueue<M> {
-  
+
   /**
    * 
    */
   public MessageQueue<M> getSenderQueue();
-  
+
   /**
    * 
    */

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java Thu Feb 21 06:38:33 2013
@@ -17,12 +17,11 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-
 /**
  * Synchronized Queue interface. Can be used to implement better synchronized
  * datastructures.
  */
-public interface SynchronizedQueue<T> extends MessageQueue<T>{
+public interface SynchronizedQueue<T> extends MessageQueue<T> {
 
   public abstract MessageQueue<T> getMessageQueue();
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java Thu Feb 21 06:38:33 2013
@@ -19,7 +19,7 @@ package org.apache.hama.bsp.sync;
 
 import org.apache.hama.HamaConfiguration;
 
-public abstract class BSPMasterSyncClient implements MasterSyncClient{
+public abstract class BSPMasterSyncClient implements MasterSyncClient {
 
   /**
    * Initialize the Synchronization client.
@@ -27,24 +27,26 @@ public abstract class BSPMasterSyncClien
    * @param conf The configuration parameters to initialize the client.
    */
   public abstract void init(HamaConfiguration conf);
-  
+
   /**
    * Clears all information stored.
    */
   public abstract void clear();
-  
+
   /**
-   * Register a newly added job 
+   * Register a newly added job
+   * 
    * @param string
    */
   public abstract void registerJob(String string);
 
   /**
    * Deregister the job from the system.
+   * 
    * @param string
    */
   public abstract void deregisterJob(String string);
-    
+
   /**
    * Closes the client.
    */

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java Thu Feb 21 06:38:33 2013
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hama.bsp.BSPJobID;
 import org.apache.hama.bsp.TaskAttemptID;
 
-public abstract class BSPPeerSyncClient implements PeerSyncClient{
+public abstract class BSPPeerSyncClient implements PeerSyncClient {
 
   /**
    * Init will be called within a spawned task, it should be used to initialize
@@ -32,8 +32,8 @@ public abstract class BSPPeerSyncClient 
    * 
    * @throws Exception
    */
-  public abstract void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
-      throws Exception;
+  public abstract void init(Configuration conf, BSPJobID jobId,
+      TaskAttemptID taskId) throws Exception;
 
   /**
    * Enters the barrier before the message sending in each superstep.
@@ -43,8 +43,8 @@ public abstract class BSPPeerSyncClient 
    * @param superstep the superstep of the task
    * @throws SyncException
    */
-  public abstract void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
-      throws SyncException;
+  public abstract void enterBarrier(BSPJobID jobId, TaskAttemptID taskId,
+      long superstep) throws SyncException;
 
   /**
    * Leaves the barrier after all communication has been done, this is usually
@@ -55,8 +55,8 @@ public abstract class BSPPeerSyncClient 
    * @param superstep the superstep of the task
    * @throws SyncException
    */
-  public abstract void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
-      throws SyncException;
+  public abstract void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId,
+      long superstep) throws SyncException;
 
   /**
    * Registers a specific task with a its host and port to the sync daemon.
@@ -89,8 +89,8 @@ public abstract class BSPPeerSyncClient 
    * @param hostAddress
    * @param port
    */
-  public abstract void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
-      String hostAddress, long port);
+  public abstract void deregisterFromBarrier(BSPJobID jobId,
+      TaskAttemptID taskId, String hostAddress, long port);
 
   /**
    * This stops the sync daemon. Only used in YARN.
@@ -104,5 +104,4 @@ public abstract class BSPPeerSyncClient 
    */
   public abstract void close() throws IOException;
 
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java Thu Feb 21 06:38:33 2013
@@ -20,11 +20,11 @@ package org.apache.hama.bsp.sync;
 import org.apache.hama.HamaConfiguration;
 
 /**
- * MasterSyncClient defines the behavior that BSPMaster should follow
- * to perform different required globally synchronized state changes. 
- *
+ * MasterSyncClient defines the behavior that BSPMaster should follow to perform
+ * different required globally synchronized state changes.
+ * 
  */
-public interface MasterSyncClient extends SyncClient{
+public interface MasterSyncClient extends SyncClient {
 
   /**
    * Initialize the Synchronization client.
@@ -32,22 +32,24 @@ public interface MasterSyncClient extend
    * @param conf The configuration parameters to initialize the client.
    */
   public void init(HamaConfiguration conf);
-  
+
   /**
    * Clears all information stored.
    */
   public void clear();
-  
+
   /**
-   * Register a newly added job 
+   * Register a newly added job
+   * 
    * @param string
    */
   public void registerJob(String string);
 
   /**
    * Deregister the job from the system.
+   * 
    * @param string
    */
   public void deregisterJob(String string);
-    
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java Thu Feb 21 06:38:33 2013
@@ -23,10 +23,10 @@ import org.apache.hama.bsp.TaskAttemptID
 
 /**
  * PeerSyncClient defines the behavior that a BSPPeer performs to maintain
- * synchronized global state as it progresses. 
+ * synchronized global state as it progresses.
  */
 
-public interface PeerSyncClient extends SyncClient{
+public interface PeerSyncClient extends SyncClient {
 
   /**
    * Init will be called within a spawned task, it should be used to initialize

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java Thu Feb 21 06:38:33 2013
@@ -29,66 +29,76 @@ import org.apache.hama.bsp.BSPJobID;
 public interface SyncClient {
 
   /**
-   * Construct key in the format required by the SyncClient for storing and 
+   * Construct key in the format required by the SyncClient for storing and
    * retrieving information. This function is recommended to use to construct
    * keys for storing keys.
+   * 
    * @param jobId The BSP Job Id.
    * @param args The list of String objects that would be used to construct key
    * @return The key consisting of entities provided in the required format.
    */
-  public String constructKey(BSPJobID jobId, String ... args);
+  public String constructKey(BSPJobID jobId, String... args);
 
   /**
    * Stores value for the specified key.
-   * @param key The key for which value should be stored. It is recommended to use 
-   * <code>constructKey</code> to create key object.
+   * 
+   * @param key The key for which value should be stored. It is recommended to
+   *          use <code>constructKey</code> to create key object.
    * @param value The value to be stored.
-   * @param permanent true if the value should be persisted after end of session.
-   * @param Listener object that provides asynchronous updates on the state 
-   * of information stored under the key.
+   * @param permanent true if the value should be persisted after end of
+   *          session.
+   * @param Listener object that provides asynchronous updates on the state of
+   *          information stored under the key.
    * @return true if the operation was successful.
    */
-  public boolean storeInformation(String key, Writable value, 
+  public boolean storeInformation(String key, Writable value,
       boolean permanent, SyncEventListener listener);
 
   /**
    * Retrieve value previously store for the key.
+   * 
    * @param key The key for which value was stored.
    * @param classType The expected class instance of value to be extracted
    * @return the value if found. Returns null if there was any error of if there
-   * was no value stored for the key.
+   *         was no value stored for the key.
    */
   public boolean getInformation(String key, Writable valueHolder);
 
   /**
    * Store new key in key set.
-   * @param key The key to be saved in key set. It is recommended to use 
-   * <code>constructKey</code> to create key object. 
-   * @param permanent true if the value should be persisted after end of session.
-   * @param listener Listener object that asynchronously notifies the events 
-   * related to the key.
+   * 
+   * @param key The key to be saved in key set. It is recommended to use
+   *          <code>constructKey</code> to create key object.
+   * @param permanent true if the value should be persisted after end of
+   *          session.
+   * @param listener Listener object that asynchronously notifies the events
+   *          related to the key.
    * @return true if operation was successful.
    */
-  public boolean addKey(String key, boolean permanent, SyncEventListener listener);
+  public boolean addKey(String key, boolean permanent,
+      SyncEventListener listener);
 
   /**
    * Check if key was previously stored.
-   * @param key The value of the key. 
+   * 
+   * @param key The value of the key.
    * @return true if the key exists.
    */
   public boolean hasKey(String key);
-  
+
   /**
-  * Get list of child keys stored under the key provided.
-  * @param key The key whose child key set are to be found.
-  * @param listener Listener object that asynchronously notifies the changes 
-  * under the provided key
-  * @return Array of child keys.
-  */
+   * Get list of child keys stored under the key provided.
+   * 
+   * @param key The key whose child key set are to be found.
+   * @param listener Listener object that asynchronously notifies the changes
+   *          under the provided key
+   * @return Array of child keys.
+   */
   public String[] getChildKeySet(String key, SyncEventListener listener);
 
   /**
    * Register a listener for events on the key.
+   * 
    * @param key The key on which an event listener should be registered.
    * @param event for which the listener is registered for.
    * @param listener The event listener that defines how to process the event.
@@ -99,15 +109,16 @@ public interface SyncClient {
 
   /**
    * Delete the key and the information stored under it.
+   * 
    * @param key
    * @param listener
    * @return
    */
   public boolean remove(String key, SyncEventListener listener);
-  
+
   /**
    * 
    */
   public void close() throws IOException;
-  
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java Thu Feb 21 06:38:33 2013
@@ -18,14 +18,15 @@
 package org.apache.hama.bsp.sync;
 
 /**
- * A distributed global synchronization event.   
+ * A distributed global synchronization event.
  */
 public interface SyncEvent {
-  
+
   /**
-   * Returns the event identifier in the scheme of events defined for the
-   * global synchronization service.
-   * @return the event identifier 
+   * Returns the event identifier in the scheme of events defined for the global
+   * synchronization service.
+   * 
+   * @return the event identifier
    */
   public int getEventId();
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java Thu Feb 21 06:38:33 2013
@@ -19,13 +19,14 @@ package org.apache.hama.bsp.sync;
 
 /**
  * This class is used to define a listener to the synchronized global event.
- *
+ * 
  */
 public abstract class SyncEventListener {
-  
+
   /**
-   * Every event is identified by an event identifier. You can refer to 
+   * Every event is identified by an event identifier. You can refer to
    * <code>SyncEvent</code> class.
+   * 
    * @param eventId The event identification code.
    */
   public abstract void handleEvent(int eventId);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncException.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncException.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncException.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncException.java Thu Feb 21 06:38:33 2013
@@ -22,7 +22,7 @@ public class SyncException extends Excep
    * 
    */
   private static final long serialVersionUID = 1L;
-  
+
   String info;
 
   public SyncException(String info) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java Thu Feb 21 06:38:33 2013
@@ -18,6 +18,7 @@
 package org.apache.hama.bsp.sync;
 
 import java.util.concurrent.Callable;
+
 import org.apache.hadoop.conf.Configuration;
 
 /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java Thu Feb 21 06:38:33 2013
@@ -30,17 +30,15 @@ public class SyncServiceFactory {
    */
   public static PeerSyncClient getPeerSyncClient(Configuration conf)
       throws ClassNotFoundException {
-    return (PeerSyncClient) ReflectionUtils
-        .newInstance(conf.getClassByName(conf.get(SYNC_PEER_CLASS,
+    return (PeerSyncClient) ReflectionUtils.newInstance(conf
+        .getClassByName(conf.get(SYNC_PEER_CLASS,
             ZooKeeperSyncClientImpl.class.getName())), conf);
   }
 
-  
   public static SyncClient getMasterSyncClient(Configuration conf)
-		  throws ClassNotFoundException {
-	  return (SyncClient) ReflectionUtils
-			  .newInstance(conf.getClassByName(conf.get(SYNC_MASTER_CLASS,
-					  ZKSyncBSPMasterClient.class.getName())), conf);
+      throws ClassNotFoundException {
+    return (SyncClient) ReflectionUtils.newInstance(conf.getClassByName(conf
+        .get(SYNC_MASTER_CLASS, ZKSyncBSPMasterClient.class.getName())), conf);
   }
 
   /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java Thu Feb 21 06:38:33 2013
@@ -187,8 +187,8 @@ public abstract class ZKSyncClient imple
    * @return The instance of Writable object.
    * @throws IOException
    */
-  protected boolean getValueFromBytes(byte[] data,
-      Writable valueHolder) throws IOException {
+  protected boolean getValueFromBytes(byte[] data, Writable valueHolder)
+      throws IOException {
     if (data != null) {
       ByteArrayInputStream istream = new ByteArrayInputStream(data);
       DataInputStream diStream = new DataInputStream(istream);
@@ -210,8 +210,7 @@ public abstract class ZKSyncClient imple
    * @return The Writable object constructed from the value read from the
    *         Zookeeper node.
    */
-  protected boolean extractData(String path,
-      Writable valueHolder) {
+  protected boolean extractData(String path, Writable valueHolder) {
     try {
       Stat stat = getStat(path);
       if (stat != null) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java Thu Feb 21 06:38:33 2013
@@ -18,49 +18,47 @@
 package org.apache.hama.bsp.sync;
 
 /**
- * Zookeeper Synchronization Event Factory. 
- * <ul>It provides three event definitions. 
+ * Zookeeper Synchronization Event Factory.
+ * <ul>
+ * It provides three event definitions.
  * <li>Value stored in a Zookeeper node is changed
  * <li>A new child node is added to a Zookeeper node.
  * <li>A Zookeeper node is deleted.
  * </ul>
  */
 public class ZKSyncEventFactory {
-  
-  public static enum ZKEvent{
-    VALUE_CHANGE_EVENT(0),
-    CHILD_ADD_EVENT(1),
-    DELETE_EVENT(2);
-    
+
+  public static enum ZKEvent {
+    VALUE_CHANGE_EVENT(0), CHILD_ADD_EVENT(1), DELETE_EVENT(2);
+
     private final int id;
-    
-    ZKEvent(int num){
+
+    ZKEvent(int num) {
       this.id = num;
     }
-    
-    public int getValue(){
+
+    public int getValue() {
       return this.id;
     }
-    
-    public static int getEventCount(){
+
+    public static int getEventCount() {
       return ZKEvent.values().length;
     }
-    
-    public String getName(int num){
-      if(num >=0 && num < ZKEvent.getEventCount()){
+
+    public String getName(int num) {
+      if (num >= 0 && num < ZKEvent.getEventCount()) {
         return ZKEvent.values()[num].name();
-      }
-      else 
-        throw new IllegalArgumentException((new StringBuilder(100)
-              .append("The value ")
-              .append(num).append(" is not a valid ZKEvent type. ")
-              .append("Expected range is 0-")
-              .append(getEventCount()-1)).toString());
+      } else
+        throw new IllegalArgumentException(
+            (new StringBuilder(100).append("The value ").append(num)
+                .append(" is not a valid ZKEvent type. ")
+                .append("Expected range is 0-").append(getEventCount() - 1))
+                .toString());
     }
-    
+
   }
 
-    public static int getSupportedEventCount(){
+  public static int getSupportedEventCount() {
     return ZKEvent.getEventCount();
   }
 
@@ -70,50 +68,52 @@ public class ZKSyncEventFactory {
     public int getEventId() {
       return ZKEvent.VALUE_CHANGE_EVENT.getValue();
     }
-    
+
   }
-  
+
   private static class ChildAddEvent implements SyncEvent {
 
     @Override
     public int getEventId() {
       return ZKEvent.CHILD_ADD_EVENT.getValue();
     }
-    
+
   }
-  
+
   private static class DeleteEvent implements SyncEvent {
 
     @Override
     public int getEventId() {
       return ZKEvent.DELETE_EVENT.getValue();
     }
-    
+
   }
-  
+
   /**
    * Provides the Zookeeper node value change event definition.
+   * 
    * @return the Zookeeper value changed event.
    */
-  public static SyncEvent getValueChangeEvent(){
+  public static SyncEvent getValueChangeEvent() {
     return new ValueChangeEvent();
   }
-  
+
   /**
    * Provides the Zookeeper deletion event definition.
+   * 
    * @return the Zookeeper node is deleted event
    */
-  public static SyncEvent getDeletionEvent(){
+  public static SyncEvent getDeletionEvent() {
     return new DeleteEvent();
   }
-  
+
   /**
-   * Provides the Zookeeper child addition event definition. 
+   * Provides the Zookeeper child addition event definition.
+   * 
    * @return the Zookeeper child node is added event
    */
-  public static SyncEvent getChildAddEvent(){
+  public static SyncEvent getChildAddEvent() {
     return new ChildAddEvent();
   }
-  
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java Thu Feb 21 06:38:33 2013
@@ -23,10 +23,10 @@ import org.apache.zookeeper.WatchedEvent
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 
-public abstract class ZKSyncEventListener extends SyncEventListener
- implements Watcher {
-Log LOG = LogFactory.getLog(SyncEventListener.class);
-  
+public abstract class ZKSyncEventListener extends SyncEventListener implements
+    Watcher {
+  Log LOG = LogFactory.getLog(SyncEventListener.class);
+
   private ZKSyncClient client;
   private SyncEvent event;
 
@@ -35,39 +35,36 @@ Log LOG = LogFactory.getLog(SyncEventLis
    */
   @Override
   public void process(WatchedEvent event) {
-    
+
     client.registerListener(event.getPath(),
-        ZKSyncEventFactory.getValueChangeEvent()
-        , this);    
-    //if(LOG.isDebugEnabled()){
-      LOG.debug(event.toString());
-    //}
+        ZKSyncEventFactory.getValueChangeEvent(), this);
+    // if(LOG.isDebugEnabled()){
+    LOG.debug(event.toString());
+    // }
 
-    if(event.getType().equals(EventType.NodeChildrenChanged)){
+    if (event.getType().equals(EventType.NodeChildrenChanged)) {
       LOG.debug("Node children changed - " + event.getPath());
       onChildKeySetChange();
-    }
-    else if (event.getType().equals(EventType.NodeDeleted)){
+    } else if (event.getType().equals(EventType.NodeDeleted)) {
       LOG.debug("Node children deleted - " + event.getPath());
       onDelete();
-    }
-    else if (event.getType().equals(EventType.NodeDataChanged)){
+    } else if (event.getType().equals(EventType.NodeDataChanged)) {
       LOG.debug("Node children changed - " + event.getPath());
-      
+
       onChange();
     }
 
   }
-  
-  public void setZKSyncClient(ZKSyncClient zkClient){
+
+  public void setZKSyncClient(ZKSyncClient zkClient) {
     client = zkClient;
   }
-  
-  public void setSyncEvent(SyncEvent event){
+
+  public void setSyncEvent(SyncEvent event) {
     this.event = event;
   }
-  
-  public SyncEvent getEvent(){
+
+  public SyncEvent getEvent() {
     return this.event;
   }
 
@@ -88,7 +85,7 @@ Log LOG = LogFactory.getLog(SyncEventLis
 
   @Override
   public void handleEvent(int eventId) {
-    
+
   }
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java Thu Feb 21 06:38:33 2013
@@ -276,9 +276,9 @@ public class ZooKeeperSyncClientImpl ext
     if (allPeers == null) {
       TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
       try {
-          List<String> var = zk.getChildren(constructKey(taskId.getJobID(), "peers"),
-                  this);
-          allPeers = var.toArray(new String[var.size()]);
+        List<String> var = zk.getChildren(
+            constructKey(taskId.getJobID(), "peers"), this);
+        allPeers = var.toArray(new String[var.size()]);
 
         for (String s : allPeers) {
           byte[] data = zk.getData(constructKey(taskId.getJobID(), "peers", s),

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java Thu Feb 21 06:38:33 2013
@@ -22,7 +22,6 @@ import javax.management.InstanceNotFound
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.sync.SyncServer;
 import org.apache.hama.util.BSPNetUtils;
 import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.hama.zookeeper.QuorumPeer.ShutdownableZooKeeperServerMain;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java Thu Feb 21 06:38:33 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.hama.bsp.taskallocation;
 
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
@@ -50,19 +49,20 @@ public class BestEffortDataLocalTaskAllo
    * @param tasksInGroomMap
    * @return
    */
-  private static String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
+  private static String getAnyGroomToSchedule(
+      Map<String, GroomServerStatus> grooms,
       Map<GroomServerStatus, Integer> tasksInGroomMap) {
 
-      for (String s : grooms.keySet()) {
-          GroomServerStatus groom = grooms.get(s);
-          if (groom == null)
-              continue;
-          Integer taskInGroom = tasksInGroomMap.get(groom);
-          taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-          if (taskInGroom < groom.getMaxTasks()) {
-              return groom.getGroomHostName();
-          }
+    for (String s : grooms.keySet()) {
+      GroomServerStatus groom = grooms.get(s);
+      if (groom == null)
+        continue;
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (taskInGroom < groom.getMaxTasks()) {
+        return groom.getGroomHostName();
       }
+    }
     return null;
   }
 
@@ -79,26 +79,27 @@ public class BestEffortDataLocalTaskAllo
       Map<GroomServerStatus, Integer> tasksInGroomMap,
       String[] possibleLocations) {
 
-      for (String location : possibleLocations) {
-          GroomServerStatus groom = grooms.get(location);
-          if (groom == null) {
-              if (LOG.isDebugEnabled()) {
-                  LOG.debug("Could not find groom for location " + location);
-              }
-              continue;
-          }
-          Integer taskInGroom = tasksInGroomMap.get(groom);
-          taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-          if (LOG.isDebugEnabled()) {
-              LOG.debug("taskInGroom = " + taskInGroom + " max tasks = " + groom.getMaxTasks()
-                      + " location = " + location + " groomhostname = " + groom.getGroomHostName());
-          }
-          if (taskInGroom < groom.getMaxTasks()
-                  && location.equals(groom.getGroomHostName())) {
-              return groom.getGroomHostName();
-          }
+    for (String location : possibleLocations) {
+      GroomServerStatus groom = grooms.get(location);
+      if (groom == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Could not find groom for location " + location);
+        }
+        continue;
+      }
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("taskInGroom = " + taskInGroom + " max tasks = "
+            + groom.getMaxTasks() + " location = " + location
+            + " groomhostname = " + groom.getGroomHostName());
       }
-    if(LOG.isDebugEnabled()){
+      if (taskInGroom < groom.getMaxTasks()
+          && location.equals(groom.getGroomHostName())) {
+        return groom.getGroomHostName();
+      }
+    }
+    if (LOG.isDebugEnabled()) {
       LOG.debug("Returning null");
     }
     return null;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java Thu Feb 21 06:38:33 2013
@@ -22,24 +22,25 @@ import org.apache.hama.bsp.TaskInProgres
 
 /**
  * <code>RawSplitResource</code> defines the data block resource that could be
- * used to find which groom to schedule for data-locality. 
+ * used to find which groom to schedule for data-locality.
  */
-public class RawSplitResource extends BSPResource{
+public class RawSplitResource extends BSPResource {
 
   private RawSplit split;
-  
-  public RawSplitResource(){
-    
+
+  public RawSplitResource() {
+
   }
-  
+
   /**
    * Initialize the resource with data block split information.
+   * 
    * @param split The data-split provided by <code>BSPJobClient</client>
    */
-  public RawSplitResource(RawSplit split){
+  public RawSplitResource(RawSplit split) {
     this.split = split;
   }
-  
+
   @Override
   public String[] getGrooms(TaskInProgress tip) {
     return split.getLocations();

Modified: hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java Thu Feb 21 06:38:33 2013
@@ -350,12 +350,12 @@ public class HttpServer {
     }
     if (needClientAuth) {
       // setting up SSL truststore for authenticating clients
-      System.setProperty("javax.net.ssl.trustStore", sslConf.get(
-          "ssl.server.truststore.location", ""));
-      System.setProperty("javax.net.ssl.trustStorePassword", sslConf.get(
-          "ssl.server.truststore.password", ""));
-      System.setProperty("javax.net.ssl.trustStoreType", sslConf.get(
-          "ssl.server.truststore.type", "jks"));
+      System.setProperty("javax.net.ssl.trustStore",
+          sslConf.get("ssl.server.truststore.location", ""));
+      System.setProperty("javax.net.ssl.trustStorePassword",
+          sslConf.get("ssl.server.truststore.password", ""));
+      System.setProperty("javax.net.ssl.trustStoreType",
+          sslConf.get("ssl.server.truststore.type", "jks"));
     }
     SslSocketConnector sslListener = new SslSocketConnector();
     sslListener.setHost(addr.getHostName());

Modified: hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java Thu Feb 21 06:38:33 2013
@@ -54,7 +54,7 @@ public interface BSPPeerProtocol extends
 
   /** Report that the task encounted a fatal error. */
   void fatalError(TaskAttemptID taskId, String message) throws IOException;
-  
+
   /**
    * Report child's progress to parent.
    * 

Modified: hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java Thu Feb 21 06:38:33 2013
@@ -22,15 +22,14 @@ import java.io.IOException;
 import org.apache.hama.bsp.Directive;
 
 /**
- * A protocol for BSPMaster talks to GroomServer. 
+ * A protocol for BSPMaster talks to GroomServer.
  */
 public interface GroomProtocol extends HamaRPCProtocolVersion {
 
   /**
    * Instruct GroomServer performaning tasks.
    * 
-   * @param directive instructs a GroomServer performing necessary
-   *        execution.
+   * @param directive instructs a GroomServer performing necessary execution.
    * @throws IOException
    */
   void dispatch(Directive directive) throws IOException;

Modified: hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java Thu Feb 21 06:38:33 2013
@@ -23,16 +23,16 @@ import org.apache.hama.bsp.Directive;
 import org.apache.hama.bsp.GroomServerStatus;
 
 /**
- * A new protocol for GroomServers communicate with BSPMaster. This
- * protocol paired with WorkerProtocl, let GroomServers enrol with 
- * BSPMaster, so that BSPMaster can dispatch tasks to GroomServers.
+ * A new protocol for GroomServers communicate with BSPMaster. This protocol
+ * paired with WorkerProtocl, let GroomServers enrol with BSPMaster, so that
+ * BSPMaster can dispatch tasks to GroomServers.
  */
 public interface MasterProtocol extends HamaRPCProtocolVersion {
 
   /**
    * A GroomServer register with its status to BSPMaster, which will update
    * GroomServers cache.
-   *
+   * 
    * @param status to be updated in cache.
    * @return true if successfully register with BSPMaster; false if fail.
    */
@@ -40,7 +40,8 @@ public interface MasterProtocol extends 
 
   /**
    * A GroomServer (periodically) reports task statuses back to the BSPMaster.
-   * @param directive 
+   * 
+   * @param directive
    */
   boolean report(Directive directive) throws IOException;
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java Thu Feb 21 06:38:33 2013
@@ -34,42 +34,42 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.monitor.Monitor.Task;
 
 /**
- * Configurator loads and configure jar files.  
+ * Configurator loads and configure jar files.
  */
 public final class Configurator {
 
-  public static final Log LOG = LogFactory.getLog(Configurator.class); 
+  public static final Log LOG = LogFactory.getLog(Configurator.class);
   public static final String DEFAULT_PLUGINS_DIR = "plugins";
-  private static final ConcurrentMap<String, Long> repos = 
-    new ConcurrentHashMap<String, Long>();
+  private static final ConcurrentMap<String, Long> repos = new ConcurrentHashMap<String, Long>();
 
   /**
    * Configure plugins directory for monitoring GroomServer.
+   * 
    * @param conf file points out the plugin dir location.
-   * @return Map contains jar path and task to be executed; null if 
-   *             plugin directory, default set to $HAMA_HOME/plugins, doesn't 
-   *             exist. 
-   */ 
-  public static Map<String, Task> configure(HamaConfiguration conf, 
+   * @return Map contains jar path and task to be executed; null if plugin
+   *         directory, default set to $HAMA_HOME/plugins, doesn't exist.
+   */
+  public static Map<String, Task> configure(HamaConfiguration conf,
       MonitorListener listener) throws IOException {
     String hamaHome = System.getProperty("hama.home.dir");
-    String pluginPath = conf.get("bsp.monitor.plugins.dir", 
-      hamaHome+File.separator+DEFAULT_PLUGINS_DIR);
+    String pluginPath = conf.get("bsp.monitor.plugins.dir", hamaHome
+        + File.separator + DEFAULT_PLUGINS_DIR);
     File pluginDir = new File(pluginPath);
-    if(null == pluginDir || null == pluginDir.listFiles()) return null; 
+    if (null == pluginDir || null == pluginDir.listFiles())
+      return null;
     ClassLoader loader = Thread.currentThread().getContextClassLoader();
     Map<String, Task> taskList = new HashMap<String, Task>();
-    LOG.debug("Scanning jar files within "+pluginDir+".");
-    for(File jar: pluginDir.listFiles()) {
+    LOG.debug("Scanning jar files within " + pluginDir + ".");
+    for (File jar : pluginDir.listFiles()) {
       String jarPath = jar.getPath();
       Long timestamp = repos.get(jarPath);
-      if(null == timestamp || jar.lastModified() > timestamp) {
+      if (null == timestamp || jar.lastModified() > timestamp) {
         Task t = load(jar, loader);
-        if(null != t) {
+        if (null != t) {
           t.setListener(listener);
           taskList.put(jarPath, t);
           repos.put(jarPath, jar.lastModified());
-          LOG.debug(jar.getName()+" is loaded.");
+          LOG.debug(jar.getName() + " is loaded.");
         }
       }
     }
@@ -78,8 +78,9 @@ public final class Configurator {
 
   /**
    * Load jar from specified path.
+   * 
    * @param path to the jar file.
-   * @param loader of the current thread.  
+   * @param loader of the current thread.
    * @return task to be run.
    */
   private static Task load(File path, ClassLoader loader) throws IOException {
@@ -87,14 +88,14 @@ public final class Configurator {
     Manifest manifest = jar.getManifest();
     String pkg = manifest.getMainAttributes().getValue("Package");
     String main = manifest.getMainAttributes().getValue("Main-Class");
-    if(null == pkg || null == main ) 
-      throw new NullPointerException("Package or main class not found "+
-      "in menifest file.");
+    if (null == pkg || null == main)
+      throw new NullPointerException("Package or main class not found "
+          + "in menifest file.");
     String namespace = pkg + File.separator + main;
     namespace = namespace.replaceAll(File.separator, ".");
-    LOG.debug("Task class to be loaded: "+namespace);
-    URLClassLoader child = 
-      new URLClassLoader(new URL[]{path.toURI().toURL()}, loader); 
+    LOG.debug("Task class to be loaded: " + namespace);
+    URLClassLoader child = new URLClassLoader(
+        new URL[] { path.toURI().toURL() }, loader);
     Thread.currentThread().setContextClassLoader(child);
     Class<?> taskClass = null;
     try {
@@ -102,16 +103,17 @@ public final class Configurator {
     } catch (ClassNotFoundException cnfe) {
       LOG.warn("Task class is not found.", cnfe);
     }
-    if(null == taskClass) return null;
+    if (null == taskClass)
+      return null;
 
     try {
-      return (Task)taskClass.newInstance();
-    } catch(InstantiationException ie) {
-      LOG.warn("Unable to instantiate task class."+namespace, ie);
-    } catch(IllegalAccessException iae) {
+      return (Task) taskClass.newInstance();
+    } catch (InstantiationException ie) {
+      LOG.warn("Unable to instantiate task class." + namespace, ie);
+    } catch (IllegalAccessException iae) {
       LOG.warn(iae);
     }
     return null;
   }
-  
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java Thu Feb 21 06:38:33 2013
@@ -19,8 +19,8 @@ package org.apache.hama.monitor;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * Represents a record containing multiple metrics.

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java Thu Feb 21 06:38:33 2013
@@ -24,36 +24,41 @@ public final class MetricsTag {
   private final String name;
   private final String value;
 
-  public MetricsTag(String name, String value){
+  public MetricsTag(String name, String value) {
     this.name = name;
     this.value = value;
   }
 
-  public final String name(){
+  public final String name() {
     return this.name;
   }
 
-  public final String value(){
+  public final String value() {
     return this.value;
   }
 
   @Override
-  public boolean equals(Object target){
-   if (target == this) return true;
-    if (null == target) return false;
-    if (getClass() != target.getClass()) return false;
- 
+  public boolean equals(Object target) {
+    if (target == this)
+      return true;
+    if (null == target)
+      return false;
+    if (getClass() != target.getClass())
+      return false;
+
     MetricsTag t = (MetricsTag) target;
-    if(!t.name.equals(name)) return false;
-    if(!t.value.equals(value)) return false;
+    if (!t.name.equals(name))
+      return false;
+    if (!t.value.equals(value))
+      return false;
     return true;
   }
 
   @Override
-  public int hashCode(){
+  public int hashCode() {
     int result = 17;
     result = 37 * result + name().hashCode();
     result = 37 * result + value().hashCode();
-    return result; 
+    return result;
   }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java Thu Feb 21 06:38:33 2013
@@ -215,7 +215,7 @@ public final class Monitor extends Threa
             // znode must exists so that child (znode/name) can be created.
             if (null != this.zk.exists(znode, false)) {
               String suffix = suffix(value);
-                if (LOG.isDebugEnabled()) {
+              if (LOG.isDebugEnabled()) {
                 LOG.debug("Publish name [" + name + "] and value [" + value
                     + "] to zk.");
               }

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java Thu Feb 21 06:38:33 2013
@@ -17,17 +17,17 @@
  */
 package org.apache.hama.monitor;
 
-import org.apache.hama.monitor.Monitor.Result; 
+import org.apache.hama.monitor.Monitor.Result;
 
 /**
- * MonitorListener passes the result for notification. 
+ * MonitorListener passes the result for notification.
  */
-public interface MonitorListener { 
-  
+public interface MonitorListener {
+
   /**
-   * When an event is triggered, the task passes the result to notify
-   * the monitor.
+   * When an event is triggered, the task passes the result to notify the
+   * monitor.
    */
-  void notify(Result result); 
+  void notify(Result result);
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java Thu Feb 21 06:38:33 2013
@@ -115,7 +115,8 @@ public final class ZKCollector implement
             LOG.info("metrics " + name + " value:" + lv);
             record.add(new Metric<Long>(name, lv));
           } else if ("b".equals(dataType)) {
-            LOG.info("metrics" + name + " value:" + Arrays.toString(dataInBytes));
+            LOG.info("metrics" + name + " value:"
+                + Arrays.toString(dataInBytes));
             record.add(new Metric<byte[]>(name, dataInBytes));
           } else {
             LOG.warn("Unkown data type for metrics name: " + child);

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeEventListener.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeEventListener.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeEventListener.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeEventListener.java Thu Feb 21 06:38:33 2013
@@ -17,8 +17,6 @@
  */
 package org.apache.hama.monitor.fd;
 
-import org.apache.hama.monitor.fd.NodeStatus;
-
 /**
  * Notify when an event happens.
  */
@@ -26,6 +24,7 @@ public interface NodeEventListener {
 
   /**
    * Notify the node status.
+   * 
    * @param status status of the groom server.
    * @param host name of the groom server.
    */
@@ -33,12 +32,13 @@ public interface NodeEventListener {
 
   /**
    * The status that the listener is interested in.
+   * 
    * @return the status the listener has interest.
    */
   NodeStatus[] interest();
 
   /**
-   * This listener's name. 
+   * This listener's name.
    */
   String name();
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeStatus.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeStatus.java Thu Feb 21 06:38:33 2013
@@ -18,6 +18,5 @@
 package org.apache.hama.monitor.fd;
 
 public enum NodeStatus {
-    Alive, Dead
+  Alive, Dead
 }
-



Mime
View raw message