hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomm...@apache.org
Subject svn commit: r1442488 - in /hama/trunk/core/src/main/java/org/apache/hama: bsp/ bsp/ft/ bsp/message/ bsp/message/io/ bsp/message/queue/ bsp/sync/ bsp/taskallocation/ monitor/ monitor/fd/
Date Tue, 05 Feb 2013 07:59:40 GMT
Author: tommaso
Date: Tue Feb  5 07:59:39 2013
New Revision: 1442488

URL: http://svn.apache.org/viewvc?rev=1442488&view=rev
Log:
HAMA-572 - fixed unnecessary (un)boxing, old style loops, etc.

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
    hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue Feb  5 07:59:39 2013
@@ -1123,9 +1123,9 @@ public class BSPJobClient extends Config
       out.writeInt(partitionID);
       bytes.write(out);
       WritableUtils.writeVInt(out, locations.length);
-      for (int i = 0; i < locations.length; i++) {
-        Text.writeString(out, locations[i]);
-      }
+        for (String location : locations) {
+            Text.writeString(out, location);
+        }
     }
 
     public long getDataLength() {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Tue Feb  5 07:59:39 2013
@@ -127,7 +127,7 @@ public class BSPMaster implements JobSub
       .createImmutable((short) 0700); // rwx------
 
   // Jobs' Meta Data
-  private Integer nextJobId = Integer.valueOf(1);
+  private Integer nextJobId = 1;
   private int totalSubmissions = 0; // how many jobs has been submitted by clients
   private int totalTasks = 0; // currnetly running tasks
   private int totalTaskCapacity; // max tasks that groom server can run
@@ -439,17 +439,17 @@ public class BSPMaster implements JobSub
 
   void deleteLocalFiles() throws IOException {
     String[] localDirs = getLocalDirs();
-    for (int i = 0; i < localDirs.length; i++) {
-      FileSystem.getLocal(conf).delete(new Path(localDirs[i]), true);
-    }
+      for (String localDir : localDirs) {
+          FileSystem.getLocal(conf).delete(new Path(localDir), true);
+      }
   }
 
   void deleteLocalFiles(String subdir) throws IOException {
     try {
       String[] localDirs = getLocalDirs();
-      for (int i = 0; i < localDirs.length; i++) {
-        FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir), true);
-      }
+        for (String localDir : localDirs) {
+            FileSystem.getLocal(conf).delete(new Path(localDir, subdir), true);
+        }
     } catch (NullPointerException e) {
       LOG.info(e);
     }
@@ -588,7 +588,7 @@ public class BSPMaster implements JobSub
     int id;
     synchronized (nextJobId) {
       id = nextJobId;
-      nextJobId = Integer.valueOf(id + 1);
+      nextJobId = id + 1;
     }
     return new BSPJobID(this.masterIdentifier, id);
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java Tue Feb  5 07:59:39 2013
@@ -210,12 +210,12 @@ public abstract class CombineFileInputFo
 
     // Finally, process all paths that do not belong to any pool.
     ArrayList<Path> myPaths = new ArrayList<Path>();
-    for (int i = 0; i < paths.length; i++) {
-      if (paths[i] == null) { // already processed
-        continue;
+      for (Path path : paths) {
+          if (path == null) { // already processed
+              continue;
+          }
+          myPaths.add(path);
       }
-      myPaths.add(paths[i]);
-    }
     // create splits for all files that are not in any pool.
     getMoreSplits(bspJob, myPaths.toArray(new Path[myPaths.size()]), maxSize,
         minSizeNode, minSizeRack, splits);
@@ -261,49 +261,48 @@ public abstract class CombineFileInputFo
 
     // process all nodes and create splits that are local
     // to a node.
-    for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
-        .entrySet().iterator(); iter.hasNext();) {
+      for (Map.Entry<String, List<OneBlockInfo>> one : nodeToBlocks
+              .entrySet()) {
 
-      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-      nodes.add(one.getKey());
-      List<OneBlockInfo> blocksInNode = one.getValue();
-
-      // for each block, copy it into validBlocks. Delete it from
-      // blockToNodes so that the same block does not appear in
-      // two different splits.
-      for (OneBlockInfo oneblock : blocksInNode) {
-        if (blockToNodes.containsKey(oneblock)) {
-          validBlocks.add(oneblock);
-          blockToNodes.remove(oneblock);
-          curSplitSize += oneblock.length;
-
-          // if the accumulated split size exceeds the maximum, then
-          // create this split.
-          if (maxSize != 0 && curSplitSize >= maxSize) {
-            // create an input split and add it to the splits array
-            addCreatedSplit(job, splits, nodes, validBlocks);
-            curSplitSize = 0;
-            validBlocks.clear();
+          nodes.add(one.getKey());
+          List<OneBlockInfo> blocksInNode = one.getValue();
+
+          // for each block, copy it into validBlocks. Delete it from
+          // blockToNodes so that the same block does not appear in
+          // two different splits.
+          for (OneBlockInfo oneblock : blocksInNode) {
+              if (blockToNodes.containsKey(oneblock)) {
+                  validBlocks.add(oneblock);
+                  blockToNodes.remove(oneblock);
+                  curSplitSize += oneblock.length;
+
+                  // if the accumulated split size exceeds the maximum, then
+                  // create this split.
+                  if (maxSize != 0 && curSplitSize >= maxSize) {
+                      // create an input split and add it to the splits array
+                      addCreatedSplit(job, splits, nodes, validBlocks);
+                      curSplitSize = 0;
+                      validBlocks.clear();
+                  }
+              }
           }
-        }
-      }
-      // if there were any blocks left over and their combined size is
-      // larger than minSplitNode, then combine them into one split.
-      // Otherwise add them back to the unprocessed pool. It is likely
-      // that they will be combined with other blocks from the same rack later
-      // on.
-      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(job, splits, nodes, validBlocks);
-      } else {
-        for (OneBlockInfo oneblock : validBlocks) {
-          blockToNodes.put(oneblock, oneblock.hosts);
-        }
+          // if there were any blocks left over and their combined size is
+          // larger than minSplitNode, then combine them into one split.
+          // Otherwise add them back to the unprocessed pool. It is likely
+          // that they will be combined with other blocks from the same rack later
+          // on.
+          if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(job, splits, nodes, validBlocks);
+          } else {
+              for (OneBlockInfo oneblock : validBlocks) {
+                  blockToNodes.put(oneblock, oneblock.hosts);
+              }
+          }
+          validBlocks.clear();
+          nodes.clear();
+          curSplitSize = 0;
       }
-      validBlocks.clear();
-      nodes.clear();
-      curSplitSize = 0;
-    }
 
     // if blocks in a rack are below the specified minimum size, then keep them
     // in 'overflow'. After the processing of all racks is complete, these
@@ -323,58 +322,57 @@ public abstract class CombineFileInputFo
       // split size).
 
       // iterate over all racks
-      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = rackToBlocks
-          .entrySet().iterator(); iter.hasNext();) {
+        for (Map.Entry<String, List<OneBlockInfo>> one : rackToBlocks
+                .entrySet()) {
 
-        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-        racks.add(one.getKey());
-        List<OneBlockInfo> blocks = one.getValue();
-
-        // for each block, copy it into validBlocks. Delete it from
-        // blockToNodes so that the same block does not appear in
-        // two different splits.
-        boolean createdSplit = false;
-        for (OneBlockInfo oneblock : blocks) {
-          if (blockToNodes.containsKey(oneblock)) {
-            validBlocks.add(oneblock);
-            blockToNodes.remove(oneblock);
-            curSplitSize += oneblock.length;
-
-            // if the accumulated split size exceeds the maximum, then
-            // create this split.
-            if (maxSize != 0 && curSplitSize >= maxSize) {
-              // create an input split and add it to the splits array
-              addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-              createdSplit = true;
-              break;
+            racks.add(one.getKey());
+            List<OneBlockInfo> blocks = one.getValue();
+
+            // for each block, copy it into validBlocks. Delete it from
+            // blockToNodes so that the same block does not appear in
+            // two different splits.
+            boolean createdSplit = false;
+            for (OneBlockInfo oneblock : blocks) {
+                if (blockToNodes.containsKey(oneblock)) {
+                    validBlocks.add(oneblock);
+                    blockToNodes.remove(oneblock);
+                    curSplitSize += oneblock.length;
+
+                    // if the accumulated split size exceeds the maximum, then
+                    // create this split.
+                    if (maxSize != 0 && curSplitSize >= maxSize) {
+                        // create an input split and add it to the splits array
+                        addCreatedSplit(job, splits, getHosts(racks), validBlocks);
+                        createdSplit = true;
+                        break;
+                    }
+                }
             }
-          }
-        }
 
-        // if we created a split, then just go to the next rack
-        if (createdSplit) {
-          curSplitSize = 0;
-          validBlocks.clear();
-          racks.clear();
-          continue;
-        }
+            // if we created a split, then just go to the next rack
+            if (createdSplit) {
+                curSplitSize = 0;
+                validBlocks.clear();
+                racks.clear();
+                continue;
+            }
 
-        if (!validBlocks.isEmpty()) {
-          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
-            // if there is a mimimum size specified, then create a single split
-            // otherwise, store these blocks into overflow data structure
-            addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-          } else {
-            // There were a few blocks in this rack that remained to be
-            // processed.
-            // Keep them in 'overflow' block list. These will be combined later.
-            overflowBlocks.addAll(validBlocks);
-          }
+            if (!validBlocks.isEmpty()) {
+                if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+                    // if there is a mimimum size specified, then create a single split
+                    // otherwise, store these blocks into overflow data structure
+                    addCreatedSplit(job, splits, getHosts(racks), validBlocks);
+                } else {
+                    // There were a few blocks in this rack that remained to be
+                    // processed.
+                    // Keep them in 'overflow' block list. These will be combined later.
+                    overflowBlocks.addAll(validBlocks);
+                }
+            }
+            curSplitSize = 0;
+            validBlocks.clear();
+            racks.clear();
         }
-        curSplitSize = 0;
-        validBlocks.clear();
-        racks.clear();
-      }
     }
 
     assert blockToNodes.isEmpty();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java Tue Feb  5 07:59:39 2013
@@ -187,9 +187,9 @@ public class CombineFileSplit implements
     if (locations != null) {
       String locs = "";
       StringBuffer locsb = new StringBuffer();
-      for (int i = 0; i < locations.length; i++) {
-        locsb.append(locations[i]).append(":");
-      }
+        for (String location : locations) {
+            locsb.append(location).append(":");
+        }
       locs = locsb.toString();
       sb.append(" Locations:").append(locs).append("; ");
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java Tue Feb  5 07:59:39 2013
@@ -43,9 +43,9 @@ public class Directive implements Writab
     public int value() {
       return this.t;
     }
-  };
+  }
 
-  public Directive() {
+    public Directive() {
   }
 
   public Directive(Directive.Type type) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Tue Feb  5 07:59:39 2013
@@ -96,9 +96,9 @@ public class GroomServer implements Runn
   // Constants
   static enum State {
     NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
-  };
+  }
 
-  private HttpServer server;
+    private HttpServer server;
   private ZooKeeper zk = null;
 
   // Running States and its related things
@@ -276,20 +276,17 @@ public class GroomServer implements Runn
         LOG.debug("Got " + outOfContactTasks.size() + " oblivious tasks");
       }
 
-      Iterator<TaskInProgress> taskIter = outOfContactTasks.iterator();
-
-      while (taskIter.hasNext()) {
-        TaskInProgress tip = taskIter.next();
-        try {
-          LOG.debug("Purging task " + tip);
-          purgeTask(tip, true);
-        } catch (Exception e) {
-          LOG.error(
-              new StringBuilder("Error while removing a timed-out task - ")
-                  .append(tip.toString()), e);
+        for (TaskInProgress tip : outOfContactTasks) {
+            try {
+                LOG.debug("Purging task " + tip);
+                purgeTask(tip, true);
+            } catch (Exception e) {
+                LOG.error(
+                        new StringBuilder("Error while removing a timed-out task - ")
+                                .append(tip.toString()), e);
 
+            }
         }
-      }
       outOfContactTasks.clear();
 
     }
@@ -467,15 +464,15 @@ public class GroomServer implements Runn
     LOG.debug(localDirs);
 
     if (localDirs != null) {
-      for (int i = 0; i < localDirs.length; i++) {
-        try {
-          LOG.info(localDirs[i]);
-          DiskChecker.checkDir(new File(localDirs[i]));
-          writable = true;
-        } catch (DiskErrorException e) {
-          LOG.warn("BSP Processor local " + e.getMessage());
+        for (String localDir : localDirs) {
+            try {
+                LOG.info(localDir);
+                DiskChecker.checkDir(new File(localDir));
+                writable = true;
+            } catch (DiskErrorException e) {
+                LOG.warn("BSP Processor local " + e.getMessage());
+            }
         }
-      }
     }
 
     if (!writable)
@@ -488,18 +485,18 @@ public class GroomServer implements Runn
 
   public void deleteLocalFiles() throws IOException {
     String[] localDirs = getLocalDirs();
-    for (int i = 0; i < localDirs.length; i++) {
-      FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true);
-    }
+      for (String localDir : localDirs) {
+          FileSystem.getLocal(this.conf).delete(new Path(localDir), true);
+      }
   }
 
   public void deleteLocalFiles(String subdir) throws IOException {
     try {
       String[] localDirs = getLocalDirs();
-      for (int i = 0; i < localDirs.length; i++) {
-        FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir),
-            true);
-      }
+        for (String localDir : localDirs) {
+            FileSystem.getLocal(this.conf).delete(new Path(localDir, subdir),
+                    true);
+        }
     } catch (NullPointerException e) {
       LOG.info(e);
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java Tue Feb  5 07:59:39 2013
@@ -56,9 +56,9 @@ public abstract class GroomServerAction 
 
     /** Update information on a peer. */
     UPDATE_PEER
-  };
+  }
 
-  /**
+    /**
    * A factory-method to create objects of given {@link ActionType}.
    * 
    * @param actionType the {@link ActionType} of object to create.

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java Tue Feb  5 07:59:39 2013
@@ -123,14 +123,13 @@ public class GroomServerStatus implement
    */
   public int countTasks() {
     int taskCount = 0;
-    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
-      TaskStatus ts = it.next();
-      TaskStatus.State state = ts.getRunState();
-      if (state == TaskStatus.State.RUNNING
-          || state == TaskStatus.State.UNASSIGNED) {
-        taskCount++;
+      for (TaskStatus ts : taskReports) {
+          TaskStatus.State state = ts.getRunState();
+          if (state == TaskStatus.State.RUNNING
+                  || state == TaskStatus.State.UNASSIGNED) {
+              taskCount++;
+          }
       }
-    }
 
     return taskCount;
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Tue Feb  5 07:59:39 2013
@@ -340,26 +340,25 @@ public class JobInProgress {
     Task result = null;
     BSPResource[] resources = new BSPResource[0];
 
-    for (int i = 0; i < tasks.length; i++) {
-      if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
+      for (TaskInProgress task : tasks) {
+          if (!task.isRunning() && !task.isComplete()) {
 
-        String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
-            groomStatuses, taskCountInGroomMap, resources, tasks[i]);
-        GroomServerStatus groomStatus = taskAllocationStrategy
-            .getGroomToAllocate(groomStatuses, selectedGrooms,
-                taskCountInGroomMap, resources, tasks[i]);
-        if (groomStatus != null){
-          result = tasks[i].constructTask(groomStatus);
-        }
-        else if (LOG.isDebugEnabled()){
-        	LOG.debug("Could not find a groom to schedule task");
-        }
-        if (result != null) {
-          updateGroomTaskDetails(tasks[i].getGroomServerStatus(), result);
-        }
-        break;
+              String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
+                      groomStatuses, taskCountInGroomMap, resources, task);
+              GroomServerStatus groomStatus = taskAllocationStrategy
+                      .getGroomToAllocate(groomStatuses, selectedGrooms,
+                              taskCountInGroomMap, resources, task);
+              if (groomStatus != null) {
+                  result = task.constructTask(groomStatus);
+              } else if (LOG.isDebugEnabled()) {
+                  LOG.debug("Could not find a groom to schedule task");
+              }
+              if (result != null) {
+                  updateGroomTaskDetails(task.getGroomServerStatus(), result);
+              }
+              break;
+          }
       }
-    }
 
     counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
     return result;
@@ -543,9 +542,9 @@ public class JobInProgress {
       //
       // kill all TIPs.
       //
-      for (int i = 0; i < tasks.length; i++) {
-        tasks[i].kill();
-      }
+        for (TaskInProgress task : tasks) {
+            task.kill();
+        }
 
       garbageCollect();
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Tue Feb  5 07:59:39 2013
@@ -181,51 +181,51 @@ public class PartitioningRunner extends
     // merge files into one.
     // TODO if we use header info, we might able to merge files without full
     // scan.
-    for (int j = 0; j < status.length; j++) {
-      int partitionID = Integer.parseInt(status[j].getPath().getName()
-          .split("[-]")[1]);
-      int denom = desiredNum / peer.getNumPeers();
-      int assignedID = partitionID;
-      if(denom > 1) {
-        assignedID = partitionID / denom;
-      }
-      
-      if (assignedID == peer.getNumPeers())
-        assignedID = assignedID - 1;
-
-      // TODO set replica factor to 1.
-      // TODO and check whether we can write to specific DataNode.
-      if (assignedID == peer.getPeerIndex()) {
-        Path partitionFile = new Path(partitionDir + "/"
-            + getPartitionName(partitionID));
-
-        FileStatus[] files = fs.listStatus(status[j].getPath());
-        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-            partitionFile, outputKeyClass, outputValueClass,
-            CompressionType.NONE);
-
-        for (int i = 0; i < files.length; i++) {
-          LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
-              + "/" + getPartitionName(partitionID));
-
-          SequenceFile.Reader reader = new SequenceFile.Reader(fs,
-              files[i].getPath(), conf);
-
-          Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass,
-              conf);
-          Writable value = (Writable) ReflectionUtils.newInstance(
-              outputValueClass, conf);
-
-          while (reader.next(key, value)) {
-            writer.append(key, value);
+      for (FileStatus statu : status) {
+          int partitionID = Integer.parseInt(statu.getPath().getName()
+                  .split("[-]")[1]);
+          int denom = desiredNum / peer.getNumPeers();
+          int assignedID = partitionID;
+          if (denom > 1) {
+              assignedID = partitionID / denom;
           }
-          reader.close();
-        }
 
-        writer.close();
-        fs.delete(status[j].getPath(), true);
+          if (assignedID == peer.getNumPeers())
+              assignedID = assignedID - 1;
+
+          // TODO set replica factor to 1.
+          // TODO and check whether we can write to specific DataNode.
+          if (assignedID == peer.getPeerIndex()) {
+              Path partitionFile = new Path(partitionDir + "/"
+                      + getPartitionName(partitionID));
+
+              FileStatus[] files = fs.listStatus(statu.getPath());
+              SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+                      partitionFile, outputKeyClass, outputValueClass,
+                      CompressionType.NONE);
+
+              for (int i = 0; i < files.length; i++) {
+                  LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
+                          + "/" + getPartitionName(partitionID));
+
+                  SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+                          files[i].getPath(), conf);
+
+                  Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass,
+                          conf);
+                  Writable value = (Writable) ReflectionUtils.newInstance(
+                          outputValueClass, conf);
+
+                  while (reader.next(key, value)) {
+                      writer.append(key, value);
+                  }
+                  reader.close();
+              }
+
+              writer.close();
+              fs.delete(statu.getPath(), true);
+          }
       }
-    }
   }
 
   @SuppressWarnings("rawtypes")

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Tue Feb  5 07:59:39 2013
@@ -251,18 +251,16 @@ class SimpleTaskScheduler extends TaskSc
       }
 
       // assembly into actions
-      Iterator<Task> taskIter = taskSet.iterator();
-      while (taskIter.hasNext()) {
-        Task task = taskIter.next();
-        GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
-        List<GroomServerAction> taskActions = actionMap.get(groomStatus);
-        if (taskActions == null) {
-          taskActions = new ArrayList<GroomServerAction>(
-              groomStatus.getMaxTasks());
+        for (Task task : taskSet) {
+            GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
+            List<GroomServerAction> taskActions = actionMap.get(groomStatus);
+            if (taskActions == null) {
+                taskActions = new ArrayList<GroomServerAction>(
+                        groomStatus.getMaxTasks());
+            }
+            taskActions.add(new LaunchTaskAction(task));
+            actionMap.put(groomStatus, taskActions);
         }
-        taskActions.add(new LaunchTaskAction(task));
-        actionMap.put(groomStatus, taskActions);
-      }
 
       sendDirectivesToGrooms(actionMap);
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java Tue Feb  5 07:59:39 2013
@@ -27,9 +27,9 @@ import org.apache.hadoop.io.WritableUtil
 public class TaskCompletionEvent implements Writable {
   static public enum Status {
     FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED
-  };
+  }
 
-  private int eventId;
+    private int eventId;
   private String groomServerInfo;
   private int taskRunTime; // using int since runtime is the time difference
   private TaskAttemptID taskId;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Tue Feb  5 07:59:39 2013
@@ -145,18 +145,17 @@ public class TaskInProgress {
       Map<GroomServerStatus, Integer> tasksInGroomMap,
       String[] possibleLocations) {
 
-    for (int i = 0; i < possibleLocations.length; ++i) {
-      String location = possibleLocations[i];
-      GroomServerStatus groom = grooms.get(location);
-      if (groom == null)
-        continue;
-      Integer taskInGroom = tasksInGroomMap.get(groom);
-      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-      if (taskInGroom < groom.getMaxTasks()
-          && location.equals(groom.getGroomHostName())) {
-        return groom.getGroomHostName();
+      for (String location : possibleLocations) {
+          GroomServerStatus groom = grooms.get(location);
+          if (groom == null)
+              continue;
+          Integer taskInGroom = tasksInGroomMap.get(groom);
+          taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+          if (taskInGroom < groom.getMaxTasks()
+                  && location.equals(groom.getGroomHostName())) {
+              return groom.getGroomHostName();
+          }
       }
-    }
     return null;
   }
 
@@ -169,17 +168,16 @@ public class TaskInProgress {
   private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
       Map<GroomServerStatus, Integer> tasksInGroomMap) {
 
-    Iterator<String> groomIter = grooms.keySet().iterator();
-    while (groomIter.hasNext()) {
-      GroomServerStatus groom = grooms.get(groomIter.next());
-      if (groom == null)
-        continue;
-      Integer taskInGroom = tasksInGroomMap.get(groom);
-      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-      if (taskInGroom < groom.getMaxTasks()) {
-        return groom.getGroomHostName();
+      for (String s : grooms.keySet()) {
+          GroomServerStatus groom = grooms.get(s);
+          if (groom == null)
+              continue;
+          Integer taskInGroom = tasksInGroomMap.get(groom);
+          taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+          if (taskInGroom < groom.getMaxTasks()) {
+              return groom.getGroomHostName();
+          }
       }
-    }
     return null;
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Tue Feb  5 07:59:39 2013
@@ -112,9 +112,9 @@ public class TaskLog {
     File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter(
         purgeTimeStamp));
     if (oldTaskLogs != null) {
-      for (int i = 0; i < oldTaskLogs.length; ++i) {
-        FileUtil.fullyDelete(oldTaskLogs[i]);
-      }
+        for (File oldTaskLog : oldTaskLogs) {
+            FileUtil.fullyDelete(oldTaskLog);
+        }
     }
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java Tue Feb  5 07:59:39 2013
@@ -181,12 +181,12 @@ public class TaskLogServlet extends Http
 
     String sLogOff = request.getParameter("start");
     if (sLogOff != null) {
-      start = Long.valueOf(sLogOff).longValue();
+      start = Long.valueOf(sLogOff);
     }
 
     String sLogEnd = request.getParameter("end");
     if (sLogEnd != null) {
-      end = Long.valueOf(sLogEnd).longValue();
+      end = Long.valueOf(sLogEnd);
     }
 
     String sPlainText = request.getParameter("plaintext");

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Tue Feb  5 07:59:39 2013
@@ -181,11 +181,11 @@ public class TaskRunner extends Thread {
       }
       File[] libs = new File(workDir, "lib").listFiles();
       if (libs != null) {
-        for (int i = 0; i < libs.length; i++) {
-          // add libs from jar to classpath
-          classPath.append(SYSTEM_PATH_SEPARATOR);
-          classPath.append(libs[i]);
-        }
+          for (File lib : libs) {
+              // add libs from jar to classpath
+              classPath.append(SYSTEM_PATH_SEPARATOR);
+              classPath.append(lib);
+          }
       }
       classPath.append(SYSTEM_PATH_SEPARATOR);
       classPath.append(new File(workDir, "classes"));

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java Tue Feb  5 07:59:39 2013
@@ -131,10 +131,10 @@ public class AsyncRcvdMsgCheckpointImpl<
 
       Map<TaskID, TaskInProgress> recoverySet = new HashMap<TaskID, TaskInProgress>(
           2 * failedTasksInProgress.length);
-      for (int i = 0; i < failedTasksInProgress.length; ++i) {
-        recoverySet.put(failedTasksInProgress[i].getTaskId(),
-            failedTasksInProgress[i]);
-      }
+        for (TaskInProgress failedTasksInProgres : failedTasksInProgress) {
+            recoverySet.put(failedTasksInProgres.getTaskId(),
+                    failedTasksInProgres);
+        }
 
       long lowestSuperstepNumber = Long.MAX_VALUE;
 
@@ -152,31 +152,31 @@ public class AsyncRcvdMsgCheckpointImpl<
       }
 
       if (taskProgress.length == this.tasks.length) {
-        for (int i = 0; i < taskProgress.length; ++i) {
-          ArrayWritable progressInformation = new ArrayWritable(
-              LongWritable.class);
-          boolean result = this.masterSyncClient.getInformation(
-              this.masterSyncClient.constructKey(jobId, "checkpoint",
-                  taskProgress[i]), progressInformation);
-
-          if (!result) {
-            lowestSuperstepNumber = -1L;
-            break;
-          }
+          for (String taskProgres : taskProgress) {
+              ArrayWritable progressInformation = new ArrayWritable(
+                      LongWritable.class);
+              boolean result = this.masterSyncClient.getInformation(
+                      this.masterSyncClient.constructKey(jobId, "checkpoint",
+                              taskProgres), progressInformation);
+
+              if (!result) {
+                  lowestSuperstepNumber = -1L;
+                  break;
+              }
 
-          Writable[] progressArr = progressInformation.get();
-          LongWritable superstepProgress = (LongWritable) progressArr[0];
+              Writable[] progressArr = progressInformation.get();
+              LongWritable superstepProgress = (LongWritable) progressArr[0];
 
-          if (superstepProgress != null) {
-            if (superstepProgress.get() < lowestSuperstepNumber) {
-              lowestSuperstepNumber = superstepProgress.get();
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Got superstep number " + lowestSuperstepNumber
-                    + " from " + taskProgress[i]);
+              if (superstepProgress != null) {
+                  if (superstepProgress.get() < lowestSuperstepNumber) {
+                      lowestSuperstepNumber = superstepProgress.get();
+                      if (LOG.isDebugEnabled()) {
+                          LOG.debug("Got superstep number " + lowestSuperstepNumber
+                                  + " from " + taskProgres);
+                      }
+                  }
               }
-            }
           }
-        }
         clearClientForSuperstep(lowestSuperstepNumber);
         restartJob(lowestSuperstepNumber, groomStatuses, recoverySet,
             allTasksInProgress, taskCountInGroomMap, actionMap);
@@ -225,56 +225,56 @@ public class AsyncRcvdMsgCheckpointImpl<
 
       if (superstep >= 0) {
         FileSystem fileSystem = FileSystem.get(conf);
-        for (int i = 0; i < allTasks.length; ++i) {
-          String[] hosts = null;
-          if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
-
-            // Update task count in map.
-            // TODO: This should be a responsibility of GroomServerStatus
-            Integer count = taskCountInGroomMap.get(allTasks[i]
-                .getGroomServerStatus());
-            if (count != null) {
-              count = count.intValue() - 1;
-              taskCountInGroomMap
-                  .put(allTasks[i].getGroomServerStatus(), count);
-            }
-
-            StringBuffer ckptPath = new StringBuffer(path);
-            ckptPath.append(this.jobId.toString());
-            ckptPath.append("/").append(superstep).append("/")
-                .append(allTasks[i].getTaskId().getId());
-            Path checkpointPath = new Path(ckptPath.toString());
-            if (fileSystem.exists(checkpointPath)) {
-              FileStatus fileStatus = fileSystem.getFileStatus(checkpointPath);
-              BlockLocation[] blocks = fileSystem.getFileBlockLocations(
-                  fileStatus, 0, fileStatus.getLen());
-              hosts = blocks[0].getHosts();
-            } else {
-              hosts = new String[groomStatuses.keySet().size()];
-              groomStatuses.keySet().toArray(hosts);
-            }
-            GroomServerStatus serverStatus = this.allocationStrategy
-                .getGroomToAllocate(groomStatuses, hosts, taskCountInGroomMap,
-                    new BSPResource[0], allTasks[i]);
-            Task task = allTasks[i].constructTask(serverStatus);
-            populateAction(task, superstep, serverStatus, actionMap);
+          for (TaskInProgress allTask : allTasks) {
+              String[] hosts = null;
+              if (recoveryMap.containsKey(allTask.getTaskId())) {
+
+                  // Update task count in map.
+                  // TODO: This should be a responsibility of GroomServerStatus
+                  Integer count = taskCountInGroomMap.get(allTask
+                          .getGroomServerStatus());
+                  if (count != null) {
+                      count = count.intValue() - 1;
+                      taskCountInGroomMap
+                              .put(allTask.getGroomServerStatus(), count);
+                  }
+
+                  StringBuffer ckptPath = new StringBuffer(path);
+                  ckptPath.append(this.jobId.toString());
+                  ckptPath.append("/").append(superstep).append("/")
+                          .append(allTask.getTaskId().getId());
+                  Path checkpointPath = new Path(ckptPath.toString());
+                  if (fileSystem.exists(checkpointPath)) {
+                      FileStatus fileStatus = fileSystem.getFileStatus(checkpointPath);
+                      BlockLocation[] blocks = fileSystem.getFileBlockLocations(
+                              fileStatus, 0, fileStatus.getLen());
+                      hosts = blocks[0].getHosts();
+                  } else {
+                      hosts = new String[groomStatuses.keySet().size()];
+                      groomStatuses.keySet().toArray(hosts);
+                  }
+                  GroomServerStatus serverStatus = this.allocationStrategy
+                          .getGroomToAllocate(groomStatuses, hosts, taskCountInGroomMap,
+                                  new BSPResource[0], allTask);
+                  Task task = allTask.constructTask(serverStatus);
+                  populateAction(task, superstep, serverStatus, actionMap);
 
-          } else {
-            restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+              } else {
+                  restartTask(allTask, superstep, groomStatuses, actionMap);
+              }
           }
-        }
       } else {
         // Start the task from the beginning.
-        for (int i = 0; i < allTasks.length; ++i) {
-          if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
-            this.allocationStrategy.getGroomToAllocate(groomStatuses,
-                this.allocationStrategy.selectGrooms(groomStatuses,
-                    taskCountInGroomMap, new BSPResource[0], allTasks[i]),
-                taskCountInGroomMap, new BSPResource[0], allTasks[i]);
-          } else {
-            restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+          for (TaskInProgress allTask : allTasks) {
+              if (recoveryMap.containsKey(allTask.getTaskId())) {
+                  this.allocationStrategy.getGroomToAllocate(groomStatuses,
+                          this.allocationStrategy.selectGrooms(groomStatuses,
+                                  taskCountInGroomMap, new BSPResource[0], allTask),
+                          taskCountInGroomMap, new BSPResource[0], allTask);
+              } else {
+                  restartTask(allTask, superstep, groomStatuses, actionMap);
+              }
           }
-        }
       }
     }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Tue Feb  5 07:59:39 2013
@@ -229,35 +229,27 @@ public abstract class AbstractMessageMan
   }
 
   private void notifySentMessage(String peerName, M message) {
-    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
-        .iterator();
-    while (iterator.hasNext()) {
-      iterator.next().onMessageSent(peerName, message);
-    }
+      for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+          aMessageListenerQueue.onMessageSent(peerName, message);
+      }
   }
 
   private void notifyReceivedMessage(M message) throws IOException {
-    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
-        .iterator();
-    while (iterator.hasNext()) {
-      iterator.next().onMessageReceived(message);
-    }
+      for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+          aMessageListenerQueue.onMessageReceived(message);
+      }
   }
 
   private void notifyInit() {
-    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
-        .iterator();
-    while (iterator.hasNext()) {
-      iterator.next().onInitialized();
-    }
+      for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+          aMessageListenerQueue.onInitialized();
+      }
   }
 
   private void notifyClose() {
-    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
-        .iterator();
-    while (iterator.hasNext()) {
-      iterator.next().onClose();
-    }
+      for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+          aMessageListenerQueue.onClose();
+      }
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java Tue Feb  5 07:59:39 2013
@@ -247,9 +247,9 @@ public class SpillingDataOutputBuffer ex
       startedSpilling_ = false;
       bufferState_.clear();
 
-      for (int i = 0; i < bufferList_.size(); ++i) {
-        bufferList_.get(i).clear();
-      }
+        for (ByteBuffer aBufferList_ : bufferList_) {
+            aBufferList_.clear();
+        }
       currentBuffer_ = bufferList_.get(0);
       bytesWritten_ = 0L;
       bytesRemaining_ = bufferSize_;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Tue Feb  5 07:59:39 2013
@@ -152,10 +152,9 @@ public class SpillingQueue<M extends Wri
 
   @Override
   public void addAll(MessageQueue<M> arg0) {
-    Iterator<M> iter = arg0.iterator();
-    while (iter.hasNext()) {
-      add(iter.next());
-    }
+      for (M anArg0 : arg0) {
+          add(anArg0);
+      }
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java Tue Feb  5 07:59:39 2013
@@ -58,9 +58,9 @@ public class ZKSyncEventFactory {
               .append(getEventCount()-1)).toString());
     }
     
-  };
-  
-  public static int getSupportedEventCount(){
+  }
+
+    public static int getSupportedEventCount(){
     return ZKEvent.getEventCount();
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java Tue Feb  5 07:59:39 2013
@@ -53,17 +53,16 @@ public class BestEffortDataLocalTaskAllo
   private static String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
       Map<GroomServerStatus, Integer> tasksInGroomMap) {
 
-    Iterator<String> groomIter = grooms.keySet().iterator();
-    while (groomIter.hasNext()) {
-      GroomServerStatus groom = grooms.get(groomIter.next());
-      if (groom == null)
-        continue;
-      Integer taskInGroom = tasksInGroomMap.get(groom);
-      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-      if (taskInGroom < groom.getMaxTasks()) {
-        return groom.getGroomHostName();
+      for (String s : grooms.keySet()) {
+          GroomServerStatus groom = grooms.get(s);
+          if (groom == null)
+              continue;
+          Integer taskInGroom = tasksInGroomMap.get(groom);
+          taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+          if (taskInGroom < groom.getMaxTasks()) {
+              return groom.getGroomHostName();
+          }
       }
-    }
     return null;
   }
 
@@ -80,26 +79,25 @@ public class BestEffortDataLocalTaskAllo
       Map<GroomServerStatus, Integer> tasksInGroomMap,
       String[] possibleLocations) {
 
-    for (int i = 0; i < possibleLocations.length; ++i) {
-      String location = possibleLocations[i];
-      GroomServerStatus groom = grooms.get(location);
-      if (groom == null){
-        if(LOG.isDebugEnabled()){
-          LOG.debug("Could not find groom for location " + location);
-        }
-        continue;
-      }
-      Integer taskInGroom = tasksInGroomMap.get(groom);
-      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-      if(LOG.isDebugEnabled()){
-        LOG.debug("taskInGroom = " + taskInGroom + " max tasks = " + groom.getMaxTasks()
-            + " location = " + location + " groomhostname = " + groom.getGroomHostName());
+      for (String location : possibleLocations) {
+          GroomServerStatus groom = grooms.get(location);
+          if (groom == null) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug("Could not find groom for location " + location);
+              }
+              continue;
+          }
+          Integer taskInGroom = tasksInGroomMap.get(groom);
+          taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+          if (LOG.isDebugEnabled()) {
+              LOG.debug("taskInGroom = " + taskInGroom + " max tasks = " + groom.getMaxTasks()
+                      + " location = " + location + " groomhostname = " + groom.getGroomHostName());
+          }
+          if (taskInGroom < groom.getMaxTasks()
+                  && location.equals(groom.getGroomHostName())) {
+              return groom.getGroomHostName();
+          }
       }
-      if (taskInGroom < groom.getMaxTasks()
-          && location.equals(groom.getGroomHostName())) {
-        return groom.getGroomHostName();
-      }
-    }
     if(LOG.isDebugEnabled()){
       LOG.debug("Returning null");
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java Tue Feb  5 07:59:39 2013
@@ -68,7 +68,7 @@ public final class Configurator {
         if(null != t) {
           t.setListener(listener);
           taskList.put(jarPath, t);
-          repos.put(jarPath, new Long(jar.lastModified()));
+          repos.put(jarPath, jar.lastModified());
           LOG.debug(jar.getName()+" is loaded.");
         }
       }

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java Tue Feb  5 07:59:39 2013
@@ -215,8 +215,7 @@ public final class Monitor extends Threa
             // znode must exists so that child (znode/name) can be created.
             if (null != this.zk.exists(znode, false)) {
               String suffix = suffix(value);
-              ;
-              if (LOG.isDebugEnabled()) {
+                if (LOG.isDebugEnabled()) {
                 LOG.debug("Publish name [" + name + "] and value [" + value
                     + "] to zk.");
               }

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java Tue Feb  5 07:59:39 2013
@@ -19,6 +19,7 @@ package org.apache.hama.monitor;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -114,7 +115,7 @@ public final class ZKCollector implement
             LOG.info("metrics " + name + " value:" + lv);
             record.add(new Metric<Long>(name, lv));
           } else if ("b".equals(dataType)) {
-            LOG.info("metrics" + name + " value:" + dataInBytes);
+            LOG.info("metrics" + name + " value:" + Arrays.toString(dataInBytes));
             record.add(new Metric<byte[]>(name, dataInBytes));
           } else {
             LOG.warn("Unkown data type for metrics name: " + child);

Modified: hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java?rev=1442488&r1=1442487&r2=1442488&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java Tue Feb  5 07:59:39 2013
@@ -152,7 +152,7 @@ public class UDPSupervisor implements Su
         if (getSamplingWindow().size() == windowSize()) {
           getSamplingWindow().remove();
         }
-        getSamplingWindow().add(new Double(heartbeat - getLatestHeartbeat()));
+        getSamplingWindow().add((double) (heartbeat - getLatestHeartbeat()));
       }
       setLatestHeartbeat(heartbeat);
     }



Mime
View raw message