incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r881080 - in /incubator/hama/trunk: bin/ conf/ src/java/ src/java/org/apache/hama/ src/java/org/apache/hama/graph/ src/java/org/apache/hama/ipc/
Date Tue, 17 Nov 2009 01:21:28 GMT
Author: edwardyoon
Date: Tue Nov 17 01:21:27 2009
New Revision: 881080

URL: http://svn.apache.org/viewvc?rev=881080&view=rev
Log:
Initial codebase import for Angrapa, a graph computing framework based on BSP.

Added:
    incubator/hama/trunk/bin/groomservers.sh
    incubator/hama/trunk/bin/hama-daemon.sh
    incubator/hama/trunk/bin/hama-daemons.sh
    incubator/hama/trunk/bin/start-all.sh
    incubator/hama/trunk/bin/stop-all.sh
    incubator/hama/trunk/conf/groomservers
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/src/java/groomserver-default.xml
    incubator/hama/trunk/src/java/org/apache/hama/HamaMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/ID.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/JobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/JobID.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/JobStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/
    incubator/hama/trunk/src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/HeartbeatResponse.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
Modified:
    incubator/hama/trunk/bin/hama
    incubator/hama/trunk/bin/hama-config.sh
    incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java

Added: incubator/hama/trunk/bin/groomservers.sh
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/bin/groomservers.sh?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/bin/groomservers.sh (added)
+++ incubator/hama/trunk/bin/groomservers.sh Tue Nov 17 01:21:27 2009
@@ -0,0 +1,74 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Copyright 2007 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+# 
+# Run a shell command on all regionserver hosts.
+#
+# Environment Variables
+#
+#   HAMA_GROOMSERVERS    File naming remote hosts.
+#     Default is ${HADOOP_CONF_DIR}/regionservers
+#   HADOOP_CONF_DIR  Alternate conf dir. Default is ${HADOOP_HOME}/conf.
+#   HAMA_CONF_DIR  Alternate hbase conf dir. Default is ${HBASE_HOME}/conf.
+#   HADOOP_SLAVE_SLEEP Seconds to sleep between spawning remote commands.
+#   HADOOP_SSH_OPTS Options passed to ssh when running remote commands.
+#
+# Modelled after $HADOOP_HOME/bin/slaves.sh.
+
+usage="Usage: groomservers [--config <hama-confdir>] command..."
+
+# if no args specified, show usage
+if [ $# -le 0 ]; then
+  echo $usage
+  exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/hama-config.sh
+
+# If the groomservers file is specified in the command line,
+# then it takes precedence over the definition in 
+# hama-env.sh. Save it here.
+HOSTLIST=$HAMA_GROOMSERVERS
+
+if [ -f "${HAMA_CONF_DIR}/hama-env.sh" ]; then
+  . "${HAMA_CONF_DIR}/hama-env.sh"
+fi
+
+if [ "$HOSTLIST" = "" ]; then
+  if [ "$HAMA_GROOMSERVERS" = "" ]; then
+    export HOSTLIST="${HAMA_CONF_DIR}/groomservers"
+  else
+    export HOSTLIST="${HAMA_GROOMSERVERS}"
+  fi
+fi
+
+for groomserver in `cat "$HOSTLIST"`; do
+ ssh $HAMA_SSH_OPTS $groomserver $"${@// /\\ }" \
+   2>&1 | sed "s/^/$groomserver: /" &
+ if [ "$HAMA_SLAVE_SLEEP" != "" ]; then
+   sleep $HAMA_SLAVE_SLEEP
+ fi
+done
+
+wait

Modified: incubator/hama/trunk/bin/hama
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/bin/hama?rev=881080&r1=881079&r2=881080&view=diff
==============================================================================
--- incubator/hama/trunk/bin/hama (original)
+++ incubator/hama/trunk/bin/hama Tue Nov 17 01:21:27 2009
@@ -55,6 +55,8 @@
 if [ $# = 0 ]; then
   echo "Usage: hama <command>"
   echo "where <command> is one of:"
+  echo "  master           run the master server"
+  echo "  groomserver      run the groom server"
   echo "  examples         run the HAMA examples"
   echo " or"
   echo "  CLASSNAME        run the class named CLASSNAME"
@@ -76,7 +78,7 @@
   #echo "run java in $JAVA_HOME"
   JAVA_HOME=$JAVA_HOME
 fi
-  
+
 if [ "$JAVA_HOME" = "" ]; then
   echo "Error: JAVA_HOME is not set."
   exit 1
@@ -184,7 +186,11 @@
 
 # figure out which class to run
 if [ "$COMMAND" = "examples" ] ; then
-  CLASS="org.apache.hama.examples.ExampleDriver" 
+  CLASS="org.apache.hama.examples.ExampleDriver"
+elif [ "$COMMAND" = "master" ] ; then
+  CLASS='org.apache.hama.HamaMaster'
+elif [ "$COMMAND" = "groomserver" ] ; then
+  CLASS='org.apache.hama.graph.GroomServer'
 else
   CLASS=$COMMAND
 fi

Modified: incubator/hama/trunk/bin/hama-config.sh
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/bin/hama-config.sh?rev=881080&r1=881079&r2=881080&view=diff
==============================================================================
--- incubator/hama/trunk/bin/hama-config.sh (original)
+++ incubator/hama/trunk/bin/hama-config.sh Tue Nov 17 01:21:27 2009
@@ -55,11 +55,19 @@
     confdir=$1
     shift
     HAMA_CONF_DIR=$confdir
+  elif [ "--hosts" = "$1" ]
+  then
+    shift
+    hosts=$1
+    shift
+    HAMA_GROOMSERVERS=$hosts
   else
     # Presume we are at end of options and break
     break
   fi
 done
- 
+
 # Allow alternate hama conf dir location.
 HAMA_CONF_DIR="${HAMA_CONF_DIR:-$HAMA_HOME/conf}"
+# List of hama groom servers.
+HAMA_GROOMSERVERS="${HAMA_GROOMSERVERS:-$HAMA_CONF_DIR/groomservers}"
\ No newline at end of file

Added: incubator/hama/trunk/bin/hama-daemon.sh
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/bin/hama-daemon.sh?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/bin/hama-daemon.sh (added)
+++ incubator/hama/trunk/bin/hama-daemon.sh Tue Nov 17 01:21:27 2009
@@ -0,0 +1,169 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Copyright 2007 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+# 
+# Runs a Hadoop hbase command as a daemon.
+#
+# Environment Variables
+#
+#   HAMA_CONF_DIR   Alternate hbase conf dir. Default is ${HAMA_HOME}/conf.
+#   HAMA_LOG_DIR    Where log files are stored.  PWD by default.
+#   HAMA_PID_DIR    The pid files are stored. /tmp by default.
+#   HAMA_IDENT_STRING   A string representing this instance of hadoop. $USER by default
+#   HAMA_NICENESS The scheduling priority for daemons. Defaults to 0.
+#
+# Modelled after $HADOOP_HOME/bin/hadoop-daemon.sh
+
+usage="Usage: hama-daemon.sh [--config <conf-dir>]\
+ (start|stop) <hama-command> \
+ <args...>"
+
+# if no args specified, show usage
+if [ $# -le 1 ]; then
+  echo $usage
+  exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/hama-config.sh
+
+# get arguments
+startStop=$1
+shift
+
+command=$1
+shift
+
+hama_rotate_log ()
+{
+    log=$1;
+    num=5;
+    if [ -n "$2" ]; then
+    num=$2
+    fi
+    if [ -f "$log" ]; then # rotate logs
+    while [ $num -gt 1 ]; do
+        prev=`expr $num - 1`
+        [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
+        num=$prev
+    done
+    mv "$log" "$log.$num";
+    fi
+}
+
+if [ -f "${HAMA_CONF_DIR}/hama-env.sh" ]; then
+  . "${HAMA_CONF_DIR}/hama-env.sh"
+fi
+
+# get log directory
+if [ "$HAMA_LOG_DIR" = "" ]; then
+  export HAMA_LOG_DIR="$HAMA_HOME/logs"
+fi
+mkdir -p "$HAMA_LOG_DIR"
+
+if [ "$HAMA_PID_DIR" = "" ]; then
+  HAMA_PID_DIR=/tmp
+fi
+
+if [ "$HAMA_IDENT_STRING" = "" ]; then
+  export HAMA_IDENT_STRING="$USER"
+fi
+
+# Some variables
+# Work out java location so can print version into log.
+if [ "$JAVA_HOME" != "" ]; then
+  #echo "run java in $JAVA_HOME"
+  JAVA_HOME=$JAVA_HOME
+fi
+if [ "$JAVA_HOME" = "" ]; then
+  echo "Error: JAVA_HOME is not set."
+  exit 1
+fi
+JAVA=$JAVA_HOME/bin/java
+export HAMA_LOGFILE=hama-$HAMA_IDENT_STRING-$command-$HOSTNAME.log
+export HAMA_ROOT_LOGGER="INFO,DRFA"
+logout=$HAMA_LOG_DIR/hama-$HAMA_IDENT_STRING-$command-$HOSTNAME.out  
+loglog="${HAMA_LOG_DIR}/${HAMA_LOGFILE}"
+pid=$HAMA_PID_DIR/hama-$HAMA_IDENT_STRING-$command.pid
+
+# Set default scheduling priority
+if [ "$HAMA_NICENESS" = "" ]; then
+    export HAMA_NICENESS=0
+fi
+
+case $startStop in
+
+  (start)
+    mkdir -p "$HAMA_PID_DIR"
+    if [ -f $pid ]; then
+      if kill -0 `cat $pid` > /dev/null 2>&1; then
+        echo $command running as process `cat $pid`.  Stop it first.
+        exit 1
+      fi
+    fi
+
+    hama_rotate_log $logout
+    echo starting $command, logging to $logout
+    # Add to the command log file vital stats on our environment.
+    echo "`date` Starting $command on `hostname`" >> $loglog
+    echo "ulimit -n `ulimit -n`" >> $loglog 2>&1
+    nohup nice -n $HAMA_NICENESS "$HAMA_HOME"/bin/hama \
+        --config "${HAMA_CONF_DIR}" \
+        $command $startStop "$@" > "$logout" 2>&1 < /dev/null &
+    echo $! > $pid
+    sleep 1; head "$logout"
+    ;;
+
+  (stop)
+    if [ -f $pid ]; then
+      if kill -0 `cat $pid` > /dev/null 2>&1; then
+        echo -n stopping $command
+        echo "`date` Stopping $command" >> $loglog
+        if [ "$command" = "master" ]; then
+          nohup nice -n $HAMA_NICENESS "$HAMA_HOME"/bin/hama \
+              --config "${HAMA_CONF_DIR}" \
+              $command $startStop "$@" > "$logout" 2>&1 < /dev/null &
+        else
+          echo "`date` Killing $command" >> $loglog
+          kill `cat $pid` > /dev/null 2>&1
+        fi
+        while kill -0 `cat $pid` > /dev/null 2>&1; do
+          echo -n "."
+          sleep 1;
+        done
+        echo
+      else
+        retval=$?
+        echo no $command to stop because kill of pid `cat $pid` failed with status $retval
+      fi
+    else
+      echo no $command to stop because no pid file $pid
+    fi
+    ;;
+
+  (*)
+    echo $usage
+    exit 1
+    ;;
+
+esac

Added: incubator/hama/trunk/bin/hama-daemons.sh
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/bin/hama-daemons.sh?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/bin/hama-daemons.sh (added)
+++ incubator/hama/trunk/bin/hama-daemons.sh Tue Nov 17 01:21:27 2009
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Copyright 2007 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+# 
+# Run a hama command on all slave hosts.
+# Modelled after $HADOOP_HOME/bin/hadoop-daemons.sh
+
+usage="Usage: hama-daemons.sh [--config <hama-confdir>] \
+ [--hosts groomserversfile] [start|stop] command args..."
+
+# if no args specified, show usage
+if [ $# -le 1 ]; then
+  echo $usage
+  exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. $bin/hama-config.sh
+
+remote_cmd="cd ${HAMA_HOME}; $bin/hama-daemon.sh --config ${HAMA_CONF_DIR} $@"
+args="--config ${HAMA_CONF_DIR} $remote_cmd"
+
+command=$2
+case $command in
+  (zookeeper)
+    exec "$bin/zookeepers.sh" $args
+    ;;
+  (*)
+    exec "$bin/groomservers.sh" $args
+    ;;
+esac

Added: incubator/hama/trunk/bin/start-all.sh
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/bin/start-all.sh?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/bin/start-all.sh (added)
+++ incubator/hama/trunk/bin/start-all.sh Tue Nov 17 01:21:27 2009
@@ -0,0 +1,41 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Copyright 2007 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+#
+# Start hama daemons.
+# Run this on master node.
+usage="Usage: start-all.sh"
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/hama-config.sh
+
+# start hama daemons
+errCode=$?
+if [ $errCode -ne 0 ]
+then
+  exit $errCode
+fi
+
+"$bin"/hama-daemon.sh --config "${HAMA_CONF_DIR}" start master 
+"$bin"/hama-daemons.sh --config "${HAMA_CONF_DIR}" \
+  --hosts "${HAMA_GROOMSERVERS}" start groomserver

Added: incubator/hama/trunk/bin/stop-all.sh
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/bin/stop-all.sh?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/bin/stop-all.sh (added)
+++ incubator/hama/trunk/bin/stop-all.sh Tue Nov 17 01:21:27 2009
@@ -0,0 +1,29 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Copyright 2007 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/hama-config.sh
+
+#"$bin"/hama-daemon.sh --config "${HAMA_CONF_DIR}" stop master
+#"$bin"/hama-daemons.sh --config "${HAMA_CONF_DIR}" stop groomserver

Added: incubator/hama/trunk/conf/groomservers
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/groomservers?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/conf/groomservers (added)
+++ incubator/hama/trunk/conf/groomservers Tue Nov 17 01:21:27 2009
@@ -0,0 +1 @@
+localhost
\ No newline at end of file

Added: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (added)
+++ incubator/hama/trunk/conf/hama-default.xml Tue Nov 17 01:21:27 2009
@@ -0,0 +1,37 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>hama.master.port</name>
+    <value>40000</value>
+    <description>The port master should bind to.</description>
+  </property>
+
+  <property>
+    <name>hama.groom.port</name>
+    <value>40020</value>
+    <description>The port an groom server binds to.
+    </description>
+  </property>
+</configuration>
\ No newline at end of file

Added: incubator/hama/trunk/src/java/groomserver-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/groomserver-default.xml?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/groomserver-default.xml (added)
+++ incubator/hama/trunk/src/java/groomserver-default.xml Tue Nov 17 01:21:27 2009
@@ -0,0 +1,13 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Do not modify this file directly.  Instead, copy entries that you -->
+<!-- wish to modify from this file into mapred-site.xml and change them -->
+<!-- there.  If mapred-site.xml does not already exist, create it.      -->
+
+<configuration>
+<property>
+  <name>hama.groomserver.local.dir</name>
+  <value>${hadoop.tmp.dir}/groomserver/local</value>
+</property>
+</configuration>

Modified: incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java?rev=881080&r1=881079&r2=881080&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/HamaConfiguration.java Tue Nov 17 01:21:27 2009
@@ -82,6 +82,7 @@
    * Adds Hama configuration files to a Configuration
    */
   private void addHamaResources() {
+    addResource("hama-default.xml");
     addResource("hama-site.xml");
   }
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/HamaMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/HamaMaster.java?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/HamaMaster.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/HamaMaster.java Tue Nov 17 01:21:27 2009
@@ -0,0 +1,216 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.graph.JobID;
+import org.apache.hama.graph.JobStatus;
+import org.apache.hama.ipc.HeartbeatResponse;
+import org.apache.hama.ipc.InterTrackerProtocol;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+
+public class HamaMaster implements JobSubmissionProtocol, InterTrackerProtocol {
+  static{
+    Configuration.addDefaultResource("groomserver-default.xml");
+  }
+  
+  public static final Log LOG = LogFactory.getLog(HamaMaster.class);
+  
+  private HamaConfiguration conf;  
+  public static enum State { INITIALIZING, RUNNING }
+  State state = State.INITIALIZING;
+  
+  String masterIdentifier;
+  
+  private Server interTrackerServer;  
+  
+  FileSystem fs = null;
+  Path systemDir = null;
+  
+  // system directories are world-wide readable and owner readable
+  final static FsPermission SYSTEM_DIR_PERMISSION =
+    FsPermission.createImmutable((short) 0733); // rwx-wx-wx
+
+  // system files should have 700 permission
+  final static FsPermission SYSTEM_FILE_PERMISSION =
+    FsPermission.createImmutable((short) 0700); // rwx------
+  
+  private static final int FS_ACCESS_RETRY_PERIOD = 10000;
+  
+  private int nextJobId = 1;
+  
+  public HamaMaster(HamaConfiguration conf, String identifier) throws IOException, InterruptedException {
+    this.conf = conf;
+    
+    this.masterIdentifier = identifier;
+    
+    InetSocketAddress addr = getAddress(conf);    
+    this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), conf);
+    
+    while (!Thread.currentThread().isInterrupted()) {
+      try {
+        if (fs == null) {
+          fs = FileSystem.get(conf);
+        }
+        // clean up the system dir, which will only work if hdfs is out of 
+        // safe mode
+        if(systemDir == null) {
+          systemDir = new Path(getSystemDir());    
+        }
+
+        LOG.info("Cleaning up the system directory");
+        fs.delete(systemDir, true);
+        if (FileSystem.mkdirs(fs, systemDir, 
+            new FsPermission(SYSTEM_DIR_PERMISSION))) {
+          break;
+        }
+        LOG.error("Mkdirs failed to create " + systemDir);
+
+      } catch (AccessControlException ace) {
+        LOG.warn("Failed to operate on mapred.system.dir (" + systemDir 
+            + ") because of permissions.");
+        LOG.warn("Manually delete the mapred.system.dir (" + systemDir 
+            + ") and then start the JobTracker.");
+        LOG.warn("Bailing out ... ");
+        throw ace;
+      } catch (IOException ie) {
+        LOG.info("problem cleaning system directory: " + systemDir, ie);
+      }
+      Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+    }
+    
+    // deleteLocalFiles(SUBDIR);
+  }
+  
+  public static HamaMaster startMaster(HamaConfiguration conf) throws IOException,
+  InterruptedException {
+    return startTracker(conf, generateNewIdentifier());
+  }
+  
+  public static HamaMaster startTracker(HamaConfiguration conf, String identifier) 
+  throws IOException, InterruptedException {
+    
+    HamaMaster result = null;
+    result = new HamaMaster(conf, identifier);
+    
+    return result;
+  }
+  
+  public static InetSocketAddress getAddress(Configuration conf) {
+    String hamaMasterStr = conf.get("hama.master.address", "localhost:40000");
+    return NetUtils.createSocketAddr(hamaMasterStr);
+  }
+  
+  public int getPort() {
+    return this.conf.getInt("hama.master.port", 0);
+  }
+
+  public HamaConfiguration getConfiguration() {
+    return this.conf;
+  }
+  
+  private static SimpleDateFormat getDateFormat() {
+    return new SimpleDateFormat("yyyyMMddHHmm");
+  }
+
+  private static String generateNewIdentifier() {
+    return getDateFormat().format(new Date());
+  }
+  
+  public void offerService() throws InterruptedException, IOException {
+    this.interTrackerServer.start();
+    
+    synchronized (this) {
+      state = State.RUNNING;
+    }
+    LOG.info("Starting RUNNING");
+    
+    this.interTrackerServer.join();
+    LOG.info("Stopped interTrackerServer");
+  }
+  
+  
+  public static void main(String [] args) {
+    StringUtils.startupShutdownMessage(HamaMaster.class, args, LOG);
+    if (args.length != 1) {
+      System.out.println("usage: HamaMaster");
+      System.exit(-1);
+    }
+      
+    try {
+      HamaConfiguration conf = new HamaConfiguration();
+      HamaMaster master = startMaster(conf);
+      master.offerService();
+    } catch (Throwable e) {
+      LOG.fatal(StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion) throws IOException {    
+    if (protocol.equals(InterTrackerProtocol.class.getName())) {
+      return InterTrackerProtocol.versionID;
+    } else if (protocol.equals(JobSubmissionProtocol.class.getName())){
+      return JobSubmissionProtocol.versionID;
+    } else {
+      throw new IOException("Unknown protocol to job tracker: " + protocol);
+    }
+  }
+
+  @Override
+  public HeartbeatResponse heartbeat(short responseId) {
+    LOG.debug(">>> return the heartbeat message.");
+    return new HeartbeatResponse((short)1);
+  }
+
+  @Override
+  public String getSystemDir() {
+    Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
+    return fs.makeQualified(sysDir).toString();
+  }
+
+  @Override
+  public JobID getNewJobId() throws IOException {
+    return new JobID(this.masterIdentifier, nextJobId++);    
+  }
+
+  @Override
+  public JobStatus submitJob(JobID jobName) throws IOException {
+    
+    return null;
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java Tue Nov 17 01:21:27 2009
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.HamaMaster;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.ipc.HeartbeatResponse;
+import org.apache.hama.ipc.InterTrackerProtocol;
+
+public class GroomServer implements Runnable {
+  public static final Log LOG = LogFactory.getLog(GroomServer.class);
+
+  static {
+    Configuration.addDefaultResource("groomserver-default.xml");
+  }
+
+  static enum State {
+    NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
+  };
+
+  HamaConfiguration conf;
+
+  volatile boolean running = true;
+  volatile boolean shuttingDown = false;
+  boolean justInited = true;
+
+  String groomserverName;
+  String localHostname;
+
+  InetSocketAddress masterAddr;
+  InterTrackerProtocol jobClient;
+  BSPPeer bspPeer;
+
+  short heartbeatResponseId = -1;
+  private volatile int heartbeatInterval = 3 * 1000;
+
+  private LocalDirAllocator localDirAllocator;
+  Path systemDirectory = null;
+  FileSystem systemFS = null;
+
+  public GroomServer(HamaConfiguration conf) throws IOException {
+    this.conf = conf;
+    masterAddr = HamaMaster.getAddress(conf);
+
+    FileSystem local = FileSystem.getLocal(conf);
+    this.localDirAllocator = new LocalDirAllocator("hama.groomserver.local.dir");
+
+    initialize();
+  }
+
+  synchronized void initialize() throws IOException {
+    if (this.conf.get("slave.host.name") != null) {
+      this.localHostname = conf.get("slave.host.name");
+    }
+
+    if (localHostname == null) {
+      this.localHostname = DNS.getDefaultHost(conf.get(
+          "hama.groomserver.dns.interface", "default"), conf.get(
+          "hama.groomserver.dns.nameserver", "default"));
+    }
+
+    checkLocalDirs(conf.getStrings("hama.groomserver.local.dir"));
+    deleteLocalFiles("groomserver");
+
+    this.groomserverName = "groomserver_" + localHostname;
+    LOG.info("Starting tracker " + this.groomserverName);
+
+    DistributedCache.purgeCache(this.conf);
+
+    this.jobClient = (InterTrackerProtocol) RPC.waitForProxy(
+        InterTrackerProtocol.class, InterTrackerProtocol.versionID, masterAddr,
+        conf);
+    this.running = true;
+    // this.bspPeer = new BSPPeer(this.conf);
+  }
+
+  private static void checkLocalDirs(String[] localDirs)
+      throws DiskErrorException {
+    boolean writable = false;
+
+    if (localDirs != null) {
+      for (int i = 0; i < localDirs.length; i++) {
+        try {
+          DiskChecker.checkDir(new File(localDirs[i]));
+          writable = true;
+        } catch (DiskErrorException e) {
+          LOG.warn("Graph Processor local " + e.getMessage());
+        }
+      }
+    }
+
+    if (!writable)
+      throw new DiskErrorException("all local directories are not writable");
+  }
+
+  public String[] getLocalDirs() {
+    return conf.getStrings("hama.groomserver.local.dir");
+  }
+
+  public void deleteLocalFiles() throws IOException {
+    String[] localDirs = getLocalDirs();
+    for (int i = 0; i < localDirs.length; i++) {
+      FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]));
+    }
+  }
+
+  public void deleteLocalFiles(String subdir) throws IOException {
+    String[] localDirs = getLocalDirs();
+    for (int i = 0; i < localDirs.length; i++) {
+      FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir));
+    }
+  }
+
+  public void cleanupStorage() throws IOException {
+    deleteLocalFiles();
+  }
+
+  private void startCleanupThreads() throws IOException {
+
+  }
+
+  public State offerService() throws Exception {
+    long lastHeartbeat = 0;
+
+    while (running && !shuttingDown) {
+      try {
+        long now = System.currentTimeMillis();
+
+        long waitTime = heartbeatInterval - (now - lastHeartbeat);
+        if (waitTime > 0) {
+          // sleeps for the wait time
+          Thread.sleep(waitTime);
+        }
+
+        if (justInited) {
+          String dir = jobClient.getSystemDir();
+          if (dir == null) {
+            throw new IOException("Failed to get system directory");
+          }
+          systemDirectory = new Path(dir);
+          systemFS = systemDirectory.getFileSystem(conf);
+        }
+
+        // Send the heartbeat and process the jobtracker's directives
+        HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
+
+        // Note the time when the heartbeat returned, use this to decide when to
+        // send the
+        // next heartbeat
+        lastHeartbeat = System.currentTimeMillis();
+
+        justInited = false;
+      } catch (InterruptedException ie) {
+        LOG.info("Interrupted. Closing down.");
+        return State.INTERRUPTED;
+      } catch (DiskErrorException de) {
+        String msg = "Exiting task tracker for disk error:\n"
+            + StringUtils.stringifyException(de);
+        LOG.error(msg);
+
+        return State.STALE;
+      } catch (RemoteException re) {
+        return State.DENIED;
+      } catch (Exception except) {
+        String msg = "Caught exception: "
+            + StringUtils.stringifyException(except);
+        LOG.error(msg);
+      }
+    }
+
+    return State.NORMAL;
+  }
+
+  private class WalkerLauncher extends Thread {
+    // TODO:
+  }
+
+  private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+    HeartbeatResponse heartbeatResponse = jobClient
+        .heartbeat(heartbeatResponseId);
+    return heartbeatResponse;
+  }
+
+  @Override
+  public void run() {
+    try {
+      startCleanupThreads();
+      boolean denied = false;
+      while (running && !shuttingDown && !denied) {
+        boolean staleState = false;
+        try {
+          while (running && !staleState && !shuttingDown && !denied) {
+            try {
+              State osState = offerService();
+              if (osState == State.STALE) {
+                staleState = true;
+              } else if (osState == State.DENIED) {
+                denied = true;
+              }
+            } catch (Exception e) {
+              if (!shuttingDown) {
+                LOG.info("Lost connection to GraphProcessor [" + masterAddr
+                    + "].  Retrying...", e);
+                try {
+                  Thread.sleep(5000);
+                } catch (InterruptedException ie) {
+                }
+              }
+            }
+          }
+        } finally {
+          // close();
+        }
+
+        if (shuttingDown) {
+          return;
+        }
+        LOG.warn("Reinitializing local state");
+        initialize();
+      }
+    } catch (IOException ioe) {
+      LOG.error("Got fatal exception while reinitializing TaskTracker: "
+          + StringUtils.stringifyException(ioe));
+      return;
+    }
+  }
+
+  public synchronized void shutdown() throws IOException {
+    shuttingDown = true;
+    close();
+  }
+
+  public synchronized void close() throws IOException {
+    this.running = false;
+
+    cleanupStorage();
+
+    // shutdown RPC connections
+    RPC.stopProxy(jobClient);
+  }
+
+  public static void main(String[] args) {
+    StringUtils.startupShutdownMessage(GroomServer.class, args, LOG);
+    if (args.length != 1) {
+      System.out.println("usage: GroomServer");
+      System.exit(-1);
+    }
+
+    try {
+      HamaConfiguration conf = new HamaConfiguration();
+      new GroomServer(conf).run();
+    } catch (Throwable e) {
+      LOG.fatal(StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/graph/ID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/ID.java?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/ID.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/ID.java Tue Nov 17 01:21:27 2009
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public abstract class ID implements WritableComparable<ID> {
+  protected static final char SEPARATOR = '_';
+  protected int id;
+
+  public ID(int id) {
+    this.id = id;
+  }
+
+  protected ID() {
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public String toString() {
+    return String.valueOf(id);
+  }
+
+  @Override
+  public int hashCode() {
+    return Integer.valueOf(id).hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if(o == null)
+      return false;
+    if (o.getClass() == this.getClass()) {
+      ID that = (ID) o;
+      return this.id == that.id;
+    }
+    else
+      return false;
+  }
+
+  public int compareTo(ID that) {
+    return this.id - that.id;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.id = in.readInt();
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(id);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/graph/JobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/JobClient.java?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/JobClient.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/JobClient.java Tue Nov 17 01:21:27 2009
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.HamaMaster;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+
+public class JobClient extends Configured {
+  private static final Log LOG = LogFactory.getLog(JobClient.class);
+
+  public static enum TaskStatusFilter {
+    NONE, KILLED, FAILED, SUCCEEDED, ALL
+  }
+
+  static {
+    Configuration.addDefaultResource("groomserver-default.xml");
+  }
+
+  private JobSubmissionProtocol jobSubmitClient;
+
+  public JobClient() {
+  }
+
+  public JobClient(HamaConfiguration conf) throws IOException {
+    setConf(conf);
+    init(conf);
+  }
+
+  public void init(HamaConfiguration conf) throws IOException {
+    String tracker = conf.get("hama.master.address", "local");
+    this.jobSubmitClient = createRPCProxy(HamaMaster.getAddress(conf), conf);
+  }
+
+  private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
+      Configuration conf) throws IOException {
+    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+        JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory(
+            conf, JobSubmissionProtocol.class));
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/graph/JobID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/JobID.java?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/JobID.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/JobID.java Tue Nov 17 01:21:27 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.io.Text;
+
+public class JobID extends ID implements Comparable<ID> {
+  protected static final String JOB = "job";
+  private final Text jtIdentifier;
+
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(4);
+  }
+
+  public JobID(String jtIdentifier, int id) {
+    super(id);
+    this.jtIdentifier = new Text(jtIdentifier);
+  }
+
+  public JobID() {
+    jtIdentifier = new Text();
+  }
+
+  public String getJtIdentifier() {
+    return jtIdentifier.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    JobID that = (JobID) o;
+    return this.jtIdentifier.equals(that.jtIdentifier);
+  }
+
+  @Override
+  public int compareTo(ID o) {
+    JobID that = (JobID) o;
+    int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier);
+    if (jtComp == 0) {
+      return this.id - that.id;
+    } else
+      return jtComp;
+  }
+
+  public StringBuilder appendTo(StringBuilder builder) {
+    builder.append(SEPARATOR);
+    builder.append(jtIdentifier);
+    builder.append(SEPARATOR);
+    builder.append(idFormat.format(id));
+    return builder;
+  }
+
+  @Override
+  public int hashCode() {
+    return jtIdentifier.hashCode() + id;
+  }
+
+  @Override
+  public String toString() {
+    return appendTo(new StringBuilder(JOB)).toString();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.jtIdentifier.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    jtIdentifier.write(out);
+  }
+
+  public static JobID forName(String str) throws IllegalArgumentException {
+    if (str == null)
+      return null;
+    try {
+      String[] parts = str.split("_");
+      if (parts.length == 3) {
+        if (parts[0].equals(JOB)) {
+          return new JobID(parts[1], Integer.parseInt(parts[2]));
+        }
+      }
+    } catch (Exception ex) {
+    }
+    throw new IllegalArgumentException("JobId string : " + str
+        + " is not properly formed");
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/graph/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/JobStatus.java?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/JobStatus.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/JobStatus.java Tue Nov 17 01:21:27 2009
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+public class JobStatus implements Writable, Cloneable {
+
+  static {
+    WritableFactories.setFactory(JobStatus.class, new WritableFactory() {
+      public Writable newInstance() {
+        return new JobStatus();
+      }
+    });
+  }
+
+  public static final int RUNNING = 1;
+  public static final int SUCCEEDED = 2;
+  public static final int FAILED = 3;
+  public static final int PREP = 4;
+  public static final int KILLED = 5;
+
+  private JobID jobid;
+  private float progress;
+  private float cleanupProgress;
+  private float setupProgress;
+  private int runState;
+  private long startTime;
+  private String schedulingInfo = "NA";
+
+  public JobStatus() {
+  }
+
+  public JobStatus(JobID jobid, float progress, int runState) {
+    this(jobid, progress, 0.0f, runState);
+  }
+
+  public JobStatus(JobID jobid, float progress, float cleanupProgress,
+      int runState) {
+    this(jobid, 0.0f, progress, cleanupProgress, runState);
+  }
+
+  public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+      float cleanupProgress, int runState) {
+    this.jobid = jobid;
+    this.setupProgress = setupProgress;
+    this.progress = mapProgress;
+    this.cleanupProgress = cleanupProgress;
+    this.runState = runState;
+  }
+
+  public JobID getJobID() {
+    return jobid;
+  }
+
+  public synchronized float progress() {
+    return progress;
+  }
+
+  synchronized void setprogress(float p) {
+    this.progress = (float) Math.min(1.0, Math.max(0.0, p));
+  }
+
+  public synchronized float cleanupProgress() {
+    return cleanupProgress;
+  }
+
+  synchronized void setCleanupProgress(float p) {
+    this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p));
+  }
+
+  public synchronized float setupProgress() {
+    return setupProgress;
+  }
+
+  synchronized void setSetupProgress(float p) {
+    this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p));
+  }
+
+  public synchronized int getRunState() {
+    return runState;
+  }
+
+  public synchronized void setRunState(int state) {
+    this.runState = state;
+  }
+
+  synchronized void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  synchronized public long getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public Object clone() {
+    try {
+      return super.clone();
+    } catch (CloneNotSupportedException cnse) {
+      throw new InternalError(cnse.toString());
+    }
+  }
+
+  public synchronized String getSchedulingInfo() {
+    return schedulingInfo;
+  }
+
+  public synchronized void setSchedulingInfo(String schedulingInfo) {
+    this.schedulingInfo = schedulingInfo;
+  }
+
+  public synchronized boolean isJobComplete() {
+    return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED);
+  }
+
+  public synchronized void write(DataOutput out) throws IOException {
+    jobid.write(out);
+    out.writeFloat(setupProgress);
+    out.writeFloat(progress);
+    out.writeFloat(cleanupProgress);
+    out.writeInt(runState);
+    out.writeLong(startTime);
+    Text.writeString(out, schedulingInfo);
+  }
+
+  public synchronized void readFields(DataInput in) throws IOException {
+    this.jobid = new JobID();
+    jobid.readFields(in);
+    this.setupProgress = in.readFloat();
+    this.progress = in.readFloat();
+    this.cleanupProgress = in.readFloat();
+    this.runState = in.readInt();
+    this.startTime = in.readLong();
+    this.schedulingInfo = Text.readString(in);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java Tue Nov 17 01:21:27 2009
@@ -0,0 +1,30 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * There is one version id for all the RPC interfaces. If any interface is
+ * changed, the versionID must be changed here.
+ */
+public interface HamaRPCProtocolVersion extends VersionedProtocol {
+  public static final long versionID = 0L;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/ipc/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/HeartbeatResponse.java?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/HeartbeatResponse.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/HeartbeatResponse.java Tue Nov 17 01:21:27 2009
@@ -0,0 +1,69 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+public class HeartbeatResponse implements Writable, Configurable {
+  private Configuration conf;
+
+  private short responseId;
+
+  public HeartbeatResponse() {
+  }
+
+  public HeartbeatResponse(short responseId) {
+    this.responseId = responseId;
+  }
+
+  public void setResponseId(short responseId) {
+    this.responseId = responseId;
+  }
+
+  public short getResponseId() {
+    return responseId;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.responseId = in.readShort();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeShort(this.responseId);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java Tue Nov 17 01:21:27 2009
@@ -0,0 +1,26 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+public interface InterTrackerProtocol extends HamaRPCProtocolVersion {
+  public HeartbeatResponse heartbeat(short responseId);
+
+  public String getSystemDir();
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=881080&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java Tue Nov 17 01:21:27 2009
@@ -0,0 +1,35 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.graph.JobID;
+import org.apache.hama.graph.JobStatus;
+
+/**
+ * Protocol that a Walker and the central Master use to communicate. This
+ * interface will contains several methods: submitJob, killJob, and killTask.
+ */
+public interface JobSubmissionProtocol extends HamaRPCProtocolVersion {
+  public JobID getNewJobId() throws IOException;
+
+  public JobStatus submitJob(JobID jobName) throws IOException;
+}



Mime
View raw message