incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1045013 - in /incubator/hama/trunk: ./ conf/ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/
Date Mon, 13 Dec 2010 03:34:29 GMT
Author: edwardyoon
Date: Mon Dec 13 03:34:28 2010
New Revision: 1045013

URL: http://svn.apache.org/viewvc?rev=1045013&view=rev
Log:
Few minor refactoring

Added:
    incubator/hama/trunk/src/java/org/apache/hama/ipc/InterServerProtocol.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobContext.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1045013&r1=1045012&r2=1045013&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Dec 13 03:34:28 2010
@@ -51,6 +51,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
     
+    HAMA-278: Few minor refactoring (edwardyoon)
     HAMA-336: The all taskid variable's type should be declared as a TaskAttemptID
                        (edwardyoon)
     HAMA-331: Removing JobInProgressListener 

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1045013&r1=1045012&r2=1045013&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Mon Dec 13 03:34:28 2010
@@ -56,13 +56,6 @@
     <value>/tmp/hama-${user.name}</value>
     <description>Temporary directory on the local filesystem.</description>
   </property>
-  <property>
-    <name>bsp.peers.num</name>
-    <value>1</value>
-    <description>The default number of bsp tasks per job.  Typically set
-    to a prime several times greater than number of available hosts.
-    </description>
-  </property>
 
   <!--
   Beginning of properties that are directly mapped from ZooKeeper's zoo.cfg.

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=1045013&r1=1045012&r2=1045013&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Mon Dec 13 03:34:28 2010
@@ -85,7 +85,7 @@ public class BSPJob extends BSPJobContex
   public void setWorkingDirectory(Path dir) throws IOException {
     ensureState(JobState.DEFINE);
     dir = new Path(getWorkingDirectory(), dir);
-    conf.set("bsp.working.dir", dir.toString());
+    conf.set(WORKING_DIR, dir.toString());
   }
 
   /**
@@ -215,6 +215,6 @@ public class BSPJob extends BSPJobContex
   }
 
   public int getNumBspTask() {
-    return conf.getInt("bsp.peers.num", 1);
+    return conf.getInt("bsp.peers.num", 0);
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1045013&r1=1045012&r2=1045013&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Mon Dec 13 03:34:28
2010
@@ -386,7 +386,8 @@ public class BSPJobClient extends Config
 
     // TODO this code must be removed
     // when GroomServer supports the multiple tasks.
-    if (job.getNumBspTask() > jc.getClusterStatus(false).getGroomServers()) {
+    if (job.getNumBspTask() == 0
+        || job.getNumBspTask() > jc.getClusterStatus(false).getGroomServers()) {
       // If the number of tasks is greater than the number of GroomServer,
       // reset the number of tasks as number of GroomServer.
       job.setNumBspTask(jc.getClusterStatus(false).getGroomServers());
@@ -462,7 +463,7 @@ public class BSPJobClient extends Config
     boolean listActiveGrooms = false;
     boolean killJob = false;
     String jobid = null;
-    
+
     HamaConfiguration conf = new HamaConfiguration(getConf());
     init(conf);
 
@@ -504,10 +505,10 @@ public class BSPJobClient extends Config
     } else if (killJob) {
       RunningJob job = jc.getJob(new BSPJobID().forName(jobid));
       if (job == null) {
-          System.out.println("Could not find job " + jobid);
+        System.out.println("Could not find job " + jobid);
       } else {
-          job.killJob();
-          System.out.println("Killed job " + jobid);
+        job.killJob();
+        System.out.println("Killed job " + jobid);
       }
       exitCode = 0;
     }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobContext.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobContext.java?rev=1045013&r1=1045012&r2=1045013&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobContext.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobContext.java Mon Dec 13 03:34:28
2010
@@ -35,7 +35,8 @@ public class BSPJobContext {
   protected static final String WORK_CLASS_ATTR = "bsp.work.class";
   protected static final String INPUT_FORMAT_CLASS_ATTR = "bsp.inputformat.class";
   protected static final String OUTPUT_FORMAT_CLASS_ATTR = "bsp.outputformat.class";
-
+  protected static final String WORKING_DIR = "bsp.working.dir";
+  
   protected final Configuration conf;
   private final BSPJobID jobId;
 
@@ -55,14 +56,14 @@ public class BSPJobContext {
   }
 
   public Path getWorkingDirectory() throws IOException {
-    String name = conf.get("bsp.working.dir");
+    String name = conf.get(WORKING_DIR);
 
     if (name != null) {
       return new Path(name);
     } else {
       try {
         Path dir = FileSystem.get(conf).getWorkingDirectory();
-        conf.set("bsp.working.dir", dir.toString());
+        conf.set(WORKING_DIR, dir.toString());
         return dir;
       } catch (IOException e) {
         throw new RuntimeException(e);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1045013&r1=1045012&r2=1045013&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Mon Dec 13 03:34:28 2010
@@ -46,14 +46,14 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.ipc.InterTrackerProtocol;
+import org.apache.hama.ipc.InterServerProtocol;
 import org.apache.hama.ipc.JobSubmissionProtocol;
 
 /**
  * BSPMaster is responsible to control all the groom servers and to manage bsp
  * jobs.
  */
-public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol,
+public class BSPMaster implements JobSubmissionProtocol, InterServerProtocol,
     GroomServerManager {
   public static final Log LOG = LogFactory.getLog(BSPMaster.class);
 
@@ -73,7 +73,7 @@ public class BSPMaster implements JobSub
 
   // Attributes
   String masterIdentifier;
-  private Server interTrackerServer;
+  private Server interServer;
 
   // Filesystem
   static final String SUBDIR = "bspMaster";
@@ -131,7 +131,7 @@ public class BSPMaster implements JobSub
         schedulerClass, conf);
 
     InetSocketAddress addr = getAddress(conf);
-    this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr
+    this.interServer = RPC.getServer(this, addr.getHostName(), addr
         .getPort(), conf);
 
     while (!Thread.currentThread().isInterrupted()) {
@@ -306,29 +306,29 @@ public class BSPMaster implements JobSub
     new Thread(this.initJobs).start();
     LOG.info("Starting jobInitThread");
 
-    this.interTrackerServer.start();
+    this.interServer.start();
 
     synchronized (this) {
       state = State.RUNNING;
     }
     LOG.info("Starting RUNNING");
 
-    this.interTrackerServer.join();
-    LOG.info("Stopped interTrackerServer");
+    this.interServer.join();
+    LOG.info("Stopped interServer");
   }
 
   // //////////////////////////////////////////////////
-  // InterTrackerProtocol
+  // InterServerProtocol
   // //////////////////////////////////////////////////
   @Override
   public long getProtocolVersion(String protocol, long clientVersion)
       throws IOException {
-    if (protocol.equals(InterTrackerProtocol.class.getName())) {
-      return InterTrackerProtocol.versionID;
+    if (protocol.equals(InterServerProtocol.class.getName())) {
+      return InterServerProtocol.versionID;
     } else if (protocol.equals(JobSubmissionProtocol.class.getName())) {
       return JobSubmissionProtocol.versionID;
     } else {
-      throw new IOException("Unknown protocol to job tracker: " + protocol);
+      throw new IOException("Unknown protocol to BSPMaster: " + protocol);
     }
   }
 
@@ -421,7 +421,7 @@ public class BSPMaster implements JobSub
   TreeMap<String, TreeSet<TaskAttemptID>> trackerToMarkedTasksMap = new TreeMap<String,
TreeSet<TaskAttemptID>>();
 
   private void removeMarkedTasks(String groomName) {
-    // Purge all the 'marked' tasks which were running at taskTracker
+    // Purge all the 'marked' tasks which were running at groomServer
     TreeSet<TaskAttemptID> markedTaskSet = trackerToMarkedTasksMap
         .get(groomName);
     if (markedTaskSet != null) {
@@ -689,7 +689,7 @@ public class BSPMaster implements JobSub
   }
 
   public void shutdown() {
-    this.interTrackerServer.stop();
+    this.interServer.stop();
   }
 
   public void createTaskEntry(TaskAttemptID taskid, String groomServer,

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1045013&r1=1045012&r2=1045013&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Mon Dec 13 03:34:28
2010
@@ -47,7 +47,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.ipc.InterTrackerProtocol;
+import org.apache.hama.ipc.InterServerProtocol;
 
 public class GroomServer implements Runnable {
   public static final Log LOG = LogFactory.getLog(GroomServer.class);
@@ -74,7 +74,7 @@ public class GroomServer implements Runn
   String groomServerName;
   String localHostname;
   InetSocketAddress bspMasterAddr;
-  InterTrackerProtocol jobClient;
+  InterServerProtocol jobClient;
 
   // Filesystem
   // private LocalDirAllocator localDirAllocator;
@@ -133,8 +133,8 @@ public class GroomServer implements Runn
 
     DistributedCache.purgeCache(this.conf);
 
-    this.jobClient = (InterTrackerProtocol) RPC.waitForProxy(
-        InterTrackerProtocol.class, InterTrackerProtocol.versionID,
+    this.jobClient = (InterServerProtocol) RPC.waitForProxy(
+        InterServerProtocol.class, InterServerProtocol.versionID,
         bspMasterAddr, conf);
     this.running = true;
   }

Added: incubator/hama/trunk/src/java/org/apache/hama/ipc/InterServerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/InterServerProtocol.java?rev=1045013&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/InterServerProtocol.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/InterServerProtocol.java Mon Dec 13
03:34:28 2010
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.HeartbeatResponse;
+
+/**
+ * Protocol that a GroomServer and the central BSPMaster use to communicate. The
+ * BSPMaster is the Server, which implements this protocol.
+ */
+public interface InterServerProtocol extends HamaRPCProtocolVersion {
+  public HeartbeatResponse heartbeat(GroomServerStatus status,
+      boolean restarted, boolean initialContact, boolean acceptNewTasks,
+      short responseId, int reportSize) throws IOException;
+
+  public String getSystemDir();
+}



Mime
View raw message