incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r986194 - in /incubator/hama/trunk: bin/ conf/ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/util/
Date Tue, 17 Aug 2010 05:35:04 GMT
Author: edwardyoon
Date: Tue Aug 17 05:35:03 2010
New Revision: 986194

URL: http://svn.apache.org/viewvc?rev=986194&view=rev
Log:
Add jar command

Added:
    incubator/hama/trunk/src/java/org/apache/hama/util/RunJar.java
Modified:
    incubator/hama/trunk/bin/hama
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java

Modified: incubator/hama/trunk/bin/hama
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/bin/hama?rev=986194&r1=986193&r2=986194&view=diff
==============================================================================
--- incubator/hama/trunk/bin/hama (original)
+++ incubator/hama/trunk/bin/hama Tue Aug 17 05:35:03 2010
@@ -162,8 +162,8 @@ elif [ "$COMMAND" = "groom" ] ; then
   BSP_OPTS="$BSP_OPTS $BSP_GROOMSERVER_OPTS"
 elif [ "$COMMAND" = "zookeeper" ] ; then
   CLASS='org.apache.hama.zookeeper.QuorumPeer'
-elif [ "$COMMAND" = "test" ] ; then
-  CLASS=org.apache.hama.bsp.BSPTestDriver
+elif [ "$COMMAND" = "jar" ] ; then
+  CLASS=org.apache.hama.util.RunJar
   BSP_OPTS="$BSP_OPTS"
 else
   CLASS=$COMMAND

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=986194&r1=986193&r2=986194&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Tue Aug 17 05:35:03 2010
@@ -57,7 +57,7 @@
   </property>
   <property>
     <name>bsp.peers.num</name>
-    <value>3</value>
+    <value>2</value>
     <description>The default number of bsp tasks per job.  Typically set
     to a prime several times greater than number of available hosts.
     </description>
@@ -117,5 +117,18 @@
     standalone and pseudo-distributed.
     </description>
   </property>
+  
+  <property>
+    <name>hama.zookeeper.quorum</name>
+    <value>localhost</value>
+    <description>Comma separated list of servers in the ZooKeeper Quorum.
+    For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
+    By default this is set to localhost for local and pseudo-distributed modes
+    of operation. For a fully-distributed setup, this should be set to a full
+    list of ZooKeeper quorum servers. If HBASE_MANAGES_ZK is set in hbase-env.sh
+    this is the list of servers which we will start/stop ZooKeeper on.
+    </description>
+  </property>
+  
   <!-- End of properties that are directly mapped from ZooKeeper's zoo.cfg -->
-</configuration>
\ No newline at end of file
+</configuration>

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java?rev=986194&r1=986193&r2=986194&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java Tue Aug
17 05:35:03 2010
@@ -26,6 +26,7 @@ public class ExampleDriver {
   public static void main(String[] args) {
     ProgramDriver pgd = new ProgramDriver();
     try {
+      pgd.addClass("pi", PiEstimator.class, "Pi Estimator");
       pgd.addClass("rand", RandomMatrix.class, "Generate matrix with random elements.");
       pgd.addClass("mult", MatrixMultiplication.class, "Mat-Mat Multiplication.");
       pgd.addClass("similarity", CosineSimilarityMatrix.class, "Cosine Similarity Matrix.");

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=986194&r1=986193&r2=986194&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Tue Aug 17
05:35:03 2010
@@ -92,8 +92,8 @@ public class PiEstimator {
     HamaConfiguration conf = new HamaConfiguration();
     // Execute locally
     //conf.set("bsp.master.address", "local");
-    conf.set("bsp.master.address", "localhost:40000");
-    
+    conf.set("bsp.master.address", "slave.udanax.org:40000");
+
     BSPJob bsp = new BSPJob(conf, PiEstimator.class);
     // Set the job name
     bsp.setJobName("pi estimation example");

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=986194&r1=986193&r2=986194&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Tue Aug 17 05:35:03 2010
@@ -110,6 +110,7 @@ public class BSPJob extends BSPJobContex
     try {
       for (Enumeration itr = loader.getResources(class_file); itr
           .hasMoreElements();) {
+
         URL url = (URL) itr.nextElement();
         if ("jar".equals(url.getProtocol())) {
           String toReturn = url.getPath();

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=986194&r1=986193&r2=986194&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Tue Aug 17 05:35:03
2010
@@ -312,16 +312,21 @@ public class BSPJobClient extends Config
     
     // Create a number of filenames in the BSPMaster's fs namespace
     FileSystem fs = getFs();
-    LOG.debug("default FileSystem: " + fs.getUri());
+    LOG.info("default FileSystem: " + fs.getUri());
     fs.delete(submitJobDir, true);
     submitJobDir = fs.makeQualified(submitJobDir);
     submitJobDir = new Path(submitJobDir.toUri().getPath());
+    LOG.info("BSPJobClient.job dir: " + submitJobDir);
     FsPermission bspSysPerms = new FsPermission(JOB_DIR_PERMISSION);
     FileSystem.mkdirs(fs, submitJobDir, bspSysPerms);
+    fs.mkdirs(submitJobDir);
+    LOG.info("job dir is exist?: " + fs.isDirectory(submitJobDir));
     short replication = (short)job.getInt("bsp.submit.replication", 10);
     
     String originalJarPath = job.getJar();
 
+    LOG.info("BSPJobClient.originalJarPath: " + originalJarPath);
+    
     if (originalJarPath != null) { // copy jar to BSPMaster's fs
       // use jar name if job is not named. 
       if ("".equals(job.getJobName())){
@@ -329,6 +334,9 @@ public class BSPJobClient extends Config
       }
       job.setJar(submitJarFile.toString());
       fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
+      LOG.info("BSPJobClient copy to: " + submitJarFile);
+      LOG.info("BSPJobClient jar file: " + fs.isFile(submitJarFile));
+      
       fs.setReplication(submitJarFile, replication);
       fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
     } else {
@@ -345,6 +353,7 @@ public class BSPJobClient extends Config
       job.setWorkingDirectory(fs.getWorkingDirectory());
     }
     
+    LOG.info("Write job file: " + submitJobFile);
     // Write job file to BSPMaster's fs        
     FSDataOutputStream out = 
       FileSystem.create(fs, submitJobFile,
@@ -356,6 +365,8 @@ public class BSPJobClient extends Config
       out.close();
     }
     
+    LOG.info("BSPJobClient job file: " + fs.isFile(submitJobFile));
+    
     //
     // Now, actually submit the job (using the submit name)
     //

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=986194&r1=986193&r2=986194&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Tue Aug 17 05:35:03 2010
@@ -323,6 +323,7 @@ public class BSPMaster implements JobSub
 
   public static InetSocketAddress getAddress(Configuration conf) {
     String hamaMasterStr = conf.get("bsp.master.address", "localhost:40000");
+    System.out.println(">>>> " + hamaMasterStr);
     return NetUtils.createSocketAddr(hamaMasterStr);
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=986194&r1=986193&r2=986194&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Tue Aug 17 05:35:03 2010
@@ -73,8 +73,7 @@ public class BSPPeer implements Watcher,
     id = conf.getInt(Constants.PEER_ID, 0);
     bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
         Constants.DEFAULT_ZOOKEEPER_ROOT);
-    zookeeperAddr = conf.get(Constants.ZOOKEEPER_SERVER_ADDRS,
-        "localhost:21810");
+    zookeeperAddr = "slave.udanax.org:21810"; // TODO: it should be configured at hama-site.xml
 
     reinitialize();
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=986194&r1=986193&r2=986194&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Tue Aug 17 05:35:03
2010
@@ -19,6 +19,7 @@ package org.apache.hama.bsp;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -49,20 +51,21 @@ import org.apache.hama.ipc.InterTrackerP
 
 public class GroomServer implements Runnable {
   public static final Log LOG = LogFactory.getLog(GroomServer.class);
-  private BSPPeer bspPeer;
+  private static BSPPeer bspPeer;
   private Task task;
-  
+
   static {
     Configuration.addDefaultResource("hama-default.xml");
     Configuration.addDefaultResource("hama-site.xml");
   }
 
   Configuration conf;
+
   // Constants
   static enum State {
     NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
   };
-  
+
   // Running States and its related things
   volatile boolean running = true;
   volatile boolean shuttingDown = false;
@@ -71,18 +74,18 @@ public class GroomServer implements Runn
   GroomServerStatus status = null;
   short heartbeatResponseId = -1;
   private volatile int heartbeatInterval = 3 * 1000;
-  
+
   // Attributes
   String groomServerName;
-  String localHostname;  
+  String localHostname;
   InetSocketAddress bspMasterAddr;
   InterTrackerProtocol jobClient;
-  
+
   // Filesystem
-  //private LocalDirAllocator localDirAllocator;
+  // private LocalDirAllocator localDirAllocator;
   Path systemDirectory = null;
   FileSystem systemFS = null;
-  
+
   // Job
   boolean acceptNewTasks = true;
   private int failures;
@@ -92,20 +95,19 @@ public class GroomServer implements Runn
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
   Map<BSPJobID, RunningJob> runningJobs = null;
 
-  private BlockingQueue<GroomServerAction> tasksToCleanup = 
-    new LinkedBlockingQueue<GroomServerAction>();
-  
+  private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
+
   public GroomServer(Configuration conf) throws IOException {
     LOG.info("groom start");
     this.conf = conf;
     String mode = conf.get("bsp.master.address");
-    if(!mode.equals("local")) {
+    if (!mode.equals("local")) {
       bspMasterAddr = BSPMaster.getAddress(conf);
     }
     bspPeer = new BSPPeer(conf);
-    
-    //FileSystem local = FileSystem.getLocal(conf);
-    //this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
+
+    // FileSystem local = FileSystem.getLocal(conf);
+    // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
   }
 
   public synchronized void initialize() throws IOException {
@@ -114,29 +116,28 @@ public class GroomServer implements Runn
     }
 
     if (localHostname == null) {
-      this.localHostname = DNS.getDefaultHost(conf.get(
-          "bsp.dns.interface", "default"), conf.get(
-          "bsp.dns.nameserver", "default"));
+      this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
+          "default"), conf.get("bsp.dns.nameserver", "default"));
     }
 
-    //check local disk
+    // check local disk
     checkLocalDirs(conf.getStrings("bsp.local.dir"));
     deleteLocalFiles("groomserver");
 
     // Clear out state tables
     this.tasks.clear();
-    this.runningJobs = new TreeMap<BSPJobID, RunningJob>();    
+    this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
     this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
     this.acceptNewTasks = true;
-    
+
     this.groomServerName = "groomd_" + localHostname;
     LOG.info("Starting tracker " + this.groomServerName);
 
     DistributedCache.purgeCache(this.conf);
 
     this.jobClient = (InterTrackerProtocol) RPC.waitForProxy(
-        InterTrackerProtocol.class, InterTrackerProtocol.versionID, bspMasterAddr,
-        conf);
+        InterTrackerProtocol.class, InterTrackerProtocol.versionID,
+        bspMasterAddr, conf);
     this.running = true;
     // this.bspPeer = new BSPPeer(this.conf);
   }
@@ -146,7 +147,7 @@ public class GroomServer implements Runn
     boolean writable = false;
 
     LOG.info(localDirs);
-    
+
     if (localDirs != null) {
       for (int i = 0; i < localDirs.length; i++) {
         try {
@@ -159,8 +160,8 @@ public class GroomServer implements Runn
       }
     }
 
-  //  if (!writable)
-      //throw new DiskErrorException("all local directories are not writable");
+    // if (!writable)
+    // throw new DiskErrorException("all local directories are not writable");
   }
 
   public String[] getLocalDirs() {
@@ -170,16 +171,17 @@ public class GroomServer implements Runn
   public void deleteLocalFiles() throws IOException {
     String[] localDirs = getLocalDirs();
     for (int i = 0; i < localDirs.length; i++) {
-      FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]),true);
+      FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true);
     }
   }
 
   public void deleteLocalFiles(String subdir) throws IOException {
-    try{ 
-    String[] localDirs = getLocalDirs();
-    for (int i = 0; i < localDirs.length; i++) {
-      FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir),true);
-    }
+    try {
+      String[] localDirs = getLocalDirs();
+      for (int i = 0; i < localDirs.length; i++) {
+        FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir),
+            true);
+      }
     } catch (NullPointerException e) {
       LOG.info(e);
     }
@@ -217,14 +219,14 @@ public class GroomServer implements Runn
 
         // Send the heartbeat and process the bspmaster's directives
         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
-        
+
         GroomServerAction[] actions = heartbeatResponse.getActions();
-        LOG.info("Got heartbeatResponse from BSPMaster with responseId: " + 
-            heartbeatResponse.getResponseId() + " and " + 
-            ((actions != null) ? actions.length : 0) + " actions");
+        LOG.info("Got heartbeatResponse from BSPMaster with responseId: "
+            + heartbeatResponse.getResponseId() + " and "
+            + ((actions != null) ? actions.length : 0) + " actions");
 
-        if (actions != null){ 
-          for(GroomServerAction action: actions) {
+        if (actions != null) {
+          for (GroomServerAction action : actions) {
             if (action instanceof LaunchTaskAction) {
               startNewTask((LaunchTaskAction) action);
             } else {
@@ -232,17 +234,17 @@ public class GroomServer implements Runn
             }
           }
         }
-        
+
         //
         // The heartbeat got through successfully!
         //
         heartbeatResponseId = heartbeatResponse.getResponseId();
-        
+
         // Note the time when the heartbeat returned, use this to decide when to
         // send the
         // next heartbeat
         lastHeartbeat = System.currentTimeMillis();
-        
+
         justStarted = false;
         justInited = false;
       } catch (InterruptedException ie) {
@@ -270,40 +272,41 @@ public class GroomServer implements Runn
     // TODO Auto-generated method stub
     task = action.getTask();
     this.launchTask();
-    //LOG.info("GroomServer: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId()
+ ", " + t.getPartition());
-    //LOG.info(t.runner);
-    //t.runner.start();
+    // LOG.info("GroomServer: " + t.getJobID() + ", " + t.getJobFile() + ", " +
+    // t.getId() + ", " + t.getPartition());
+    // LOG.info(t.runner);
+    // t.runner.start();
     // TODO: execute task
-    
+
   }
 
   private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
     // 
-    // Check if the last heartbeat got through... 
+    // Check if the last heartbeat got through...
     // if so then build the heartbeat information for the BSPMaster;
     // else resend the previous status information.
     //
     if (status == null) {
       synchronized (this) {
-        status = new GroomServerStatus(groomServerName, localHostname, 
+        status = new GroomServerStatus(groomServerName, localHostname,
             cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks);
       }
     } else {
-      LOG.info("Resending 'status' to '" + bspMasterAddr.getHostName() +
-        "' with reponseId '" + heartbeatResponseId+"'");
+      LOG.info("Resending 'status' to '" + bspMasterAddr.getHostName()
+          + "' with reponseId '" + heartbeatResponseId + "'");
     }
-    
+
     // TODO - Later, acceptNewTask is to be set by the status of groom server.
-    HeartbeatResponse heartbeatResponse = jobClient
-        .heartbeat(status,justStarted,justInited,true,heartbeatResponseId);
+    HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
+        justStarted, justInited, true, heartbeatResponseId);
     return heartbeatResponse;
   }
-  
+
   private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses() {
     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
-    for(TaskInProgress tip: runningTasks.values()) {
+    for (TaskInProgress tip : runningTasks.values()) {
       TaskStatus status = tip.getStatus();
-      result.add((TaskStatus)status.clone());      
+      result.add((TaskStatus) status.clone());
     }
     return result;
   }
@@ -315,7 +318,7 @@ public class GroomServer implements Runn
       boolean denied = false;
       LOG.info("Why? " + running + ", " + shuttingDown + ", " + denied);
       while (running && !shuttingDown && !denied) {
-     
+
         boolean staleState = false;
         try {
           while (running && !staleState && !shuttingDown && !denied)
{
@@ -369,62 +372,60 @@ public class GroomServer implements Runn
   }
 
   public static Thread startGroomServer(final GroomServer hrs) {
-    return startGroomServer(hrs,
-      "regionserver" + hrs.groomServerName);
+    return startGroomServer(hrs, "regionserver" + hrs.groomServerName);
   }
-  
-  public static Thread startGroomServer(final GroomServer hrs,
-      final String name) {
+
+  public static Thread startGroomServer(final GroomServer hrs, final String name) {
     Thread t = new Thread(hrs);
     t.setName(name);
     t.start();
     return t;
   }
-  
-  ///////////////////////////////////////////////////////
+
+  // /////////////////////////////////////////////////////
   // TaskInProgress maintains all the info for a Task that
-  // lives at this GroomServer.  It maintains the Task object,
+  // lives at this GroomServer. It maintains the Task object,
   // its TaskStatus, and the TaskRunner.
-  ///////////////////////////////////////////////////////
+  // /////////////////////////////////////////////////////
   class TaskInProgress {
     Task task;
     volatile boolean done = false;
     volatile boolean wasKilled = false;
     private TaskStatus taskStatus;
-    
+
     public TaskInProgress(Task task, BSPJobContext job) {
-      this.task = task;      
+      this.task = task;
     }
-    
+
     /**
      */
     public Task getTask() {
       return task;
     }
-    
+
     /**
      */
     public synchronized TaskStatus getStatus() {
       return taskStatus;
     }
-    
+
     /**
      */
     public TaskStatus.State getRunState() {
       return taskStatus.getRunState();
     }
-    
+
     public boolean wasKilled() {
       return wasKilled;
     }
-    
+
     @Override
     public boolean equals(Object obj) {
-      return (obj instanceof TaskInProgress) &&
-        task.getTaskID().equals
-        (((TaskInProgress) obj).getTask().getTaskID());
+      return (obj instanceof TaskInProgress)
+          && task.getTaskID().equals(
+              ((TaskInProgress) obj).getTask().getTaskID());
     }
-        
+
     @Override
     public int hashCode() {
       return task.getTaskID().hashCode();
@@ -434,16 +435,16 @@ public class GroomServer implements Runn
   public boolean isRunning() {
     return running;
   }
- 
-  public static GroomServer constructGroomServer(Class<? extends GroomServer> groomServerClass,
-      final Configuration conf2)  {
+
+  public static GroomServer constructGroomServer(
+      Class<? extends GroomServer> groomServerClass, final Configuration conf2) {
     try {
-      Constructor<? extends GroomServer> c =
-        groomServerClass.getConstructor(Configuration.class);
+      Constructor<? extends GroomServer> c = groomServerClass
+          .getConstructor(Configuration.class);
       return c.newInstance(conf2);
     } catch (Exception e) {
-      throw new RuntimeException("Failed construction of " +
-        "Master: " + groomServerClass.toString(), e);
+      throw new RuntimeException("Failed construction of " + "Master: "
+          + groomServerClass.toString(), e);
     }
   }
 
@@ -456,11 +457,14 @@ public class GroomServer implements Runn
 
     try {
       Configuration conf = new HamaConfiguration();
-      conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
-      conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
-      
-      conf.set(Constants.PEER_PORT, String.valueOf(30000));
-      GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf);
+      conf.addResource(new Path(
+          "/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
+      conf.addResource(new Path(
+          "/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
+
+      // conf.set(Constants.PEER_PORT, String.valueOf(30000));
+      GroomServer groom = GroomServer.constructGroomServer(GroomServer.class,
+          conf);
       startGroomServer(groom);
     } catch (Throwable e) {
       LOG.fatal(StringUtils.stringifyException(e));
@@ -473,21 +477,24 @@ public class GroomServer implements Runn
   }
 
   public void launchTask() {
-    Configuration jobConf = new Configuration();
-    jobConf.addResource(new Path(task.getJobFile().replace("file:", "")));
-    BSP bsp = (BSP) ReflectionUtils.newInstance(jobConf.getClass("bsp.work.class",
-        BSP.class), conf);
-    bsp.setPeer(bspPeer);
-    bsp.start();
-    while(!bsp.isAlive()) {
-      LOG.info("i'm done. ");
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
+    // TODO: task localizing and execute them.
+    
+    try {
+      LOG.info(">>>>>>>>>>>>> " + systemFS.isFile(new
Path(task.getJobFile())));
+      //LOG.info("bsp.work.class: " + conf.getClass("bsp.work.class", BSP.class));
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
     }
+    
+    /*
+     * try { jobConf.addResource(new Path(task.getJobFile().replace("file:",
+     * ""))); LOG.info("Job File>>>>> " + task.getJobFile().replace("file:",
+     * "")); BSP bsp = (BSP)
+     * ReflectionUtils.newInstance(jobConf.getClass("bsp.work.class",
+     * BSP.class), conf); bsp.setPeer(bspPeer); bsp.start(); } catch (Exception
+     * e) { System.exit(-1); }
+     */
   }
 
   public String getServerName() {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=986194&r1=986193&r2=986194&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Tue Aug 17 05:35:03
2010
@@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hama.Constants;
 
 /**
  *

Added: incubator/hama/trunk/src/java/org/apache/hama/util/RunJar.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/RunJar.java?rev=986194&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/RunJar.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/RunJar.java Tue Aug 17 05:35:03 2010
@@ -0,0 +1,129 @@
+package org.apache.hama.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import org.apache.hadoop.fs.FileUtil;
+
+public class RunJar {
+
+  /** Unpack a jar file into a directory. */
+  public static void unJar(File jarFile, File toDir) throws IOException {
+    JarFile jar = new JarFile(jarFile);
+    try {
+      Enumeration entries = jar.entries();
+      while (entries.hasMoreElements()) {
+        JarEntry entry = (JarEntry)entries.nextElement();
+        if (!entry.isDirectory()) {
+          InputStream in = jar.getInputStream(entry);
+          try {
+            File file = new File(toDir, entry.getName());
+            file.getParentFile().mkdirs();
+            OutputStream out = new FileOutputStream(file);
+            try {
+              byte[] buffer = new byte[8192];
+              int i;
+              while ((i = in.read(buffer)) != -1) {
+                out.write(buffer, 0, i);
+              }
+            } finally {
+              out.close();
+            }
+          } finally {
+            in.close();
+          }
+        }
+      }
+    } finally {
+      jar.close();
+    }
+  }
+
+  /** Run a Hadoop job jar.  If the main class is not in the jar's manifest,
+   * then it must be provided on the command line. */
+  public static void main(String[] args) throws Throwable {
+    String usage = "RunJar jarFile [mainClass] args...";
+
+    if (args.length < 1) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+
+    int firstArg = 0;
+    String fileName = args[firstArg++];
+    File file = new File(fileName);
+    String mainClassName = null;
+
+    JarFile jarFile = new JarFile(fileName);
+    Manifest manifest = jarFile.getManifest();
+    if (manifest != null) {
+      mainClassName = manifest.getMainAttributes().getValue("Main-Class");
+    }
+    jarFile.close();
+
+    if (mainClassName == null) {
+      if (args.length < 2) {
+        System.err.println(usage);
+        System.exit(-1);
+      }
+      mainClassName = args[firstArg++];
+    }
+    mainClassName = mainClassName.replaceAll("/", ".");
+
+    final File workDir = File.createTempFile("hama-unjar","");
+    workDir.delete();
+    workDir.mkdirs();
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+        public void run() {
+          try {
+            FileUtil.fullyDelete(workDir);
+          } catch (IOException e) {
+          }
+        }
+      });
+
+    unJar(file, workDir);
+    
+    ArrayList classPath = new ArrayList();
+    classPath.add(new File(workDir+"/").toURL());
+    classPath.add(file.toURL());
+    classPath.add(new File(workDir, "classes/").toURL());
+    File[] libs = new File(workDir, "lib").listFiles();
+    if (libs != null) {
+      for (int i = 0; i < libs.length; i++) {
+        classPath.add(libs[i].toURL());
+      }
+    }
+    ClassLoader loader =
+      new URLClassLoader((URL[])classPath.toArray(new URL[0]));
+
+    Thread.currentThread().setContextClassLoader(loader);
+    Class mainClass = loader.loadClass(mainClassName);
+    Method main = mainClass.getMethod("main", new Class[] {
+      Array.newInstance(String.class, 0).getClass()
+    });
+    String[] newArgs = (String[])Arrays.asList(args)
+      .subList(firstArg, args.length).toArray(new String[0]);
+    try {
+      main.invoke(null, new Object[] { newArgs });
+    } catch (InvocationTargetException e) {
+      throw e.getTargetException();
+    }
+  }
+  
+}



Mime
View raw message