singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wang...@apache.org
Subject [2/3] incubator-singa git commit: SINGA-38 Support concurrent jobs
Date Wed, 22 Jul 2015 05:13:40 GMT
SINGA-38 Support concurrent jobs

After supporting concurrent jobs, SINGA could be used in following ways:

1. Quick Run
  This is for whose who just want to test singa or run a single job.
  * start a singa job
    $./bin/singa-run.sh -conf=CONF_DIR
  * stop singa
    $./bin/singa-stop.sh
  * remove all singa-related data
    $./bin/singa-cleanup.sh

2. Multi-Job Management
  This is for whose who want to manage multiple jobs in a cluster.
  In this case you should first:
    - edit host file in conf/hostfile
    - edit zookeeper host in conf/singa.conf
    - make your ssh command password-free
  * start a singa job
    $./bin/singa-run.sh -conf=CONF_DIR
  * list/view/kill singa jobs
    $./bin/singa-console.sh list
    $./bin/singa-console.sh view JOB_ID
    $./bin/singa-console.sh kill JOB_ID
  * stop entire singa
    $./bin/singa-stop.sh
  * remove all singa-related data
    $./bin/singa-cleanup.sh

3. Zookeeper Management
  If you do not have a zookeeper, please install via thirdparty/install.sh
  If you want to manage zookeeper externally, please ensure it is running, and
    $export ZK_HOME=your_zookeeper_path
    $export SINGA_MANAGES_ZK=false


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/1fa5d5f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/1fa5d5f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/1fa5d5f1

Branch: refs/heads/master
Commit: 1fa5d5f110b1c9b2feb59644c0e2fc58b8b32301
Parents: f746b99
Author: wang sheng <wangsheng1001@gmail.com>
Authored: Tue Jul 21 20:48:36 2015 +0800
Committer: wang sheng <wangsheng1001@gmail.com>
Committed: Tue Jul 21 22:33:31 2015 +0800

----------------------------------------------------------------------
 .gitignore                 |   2 +
 bin/singa-console.sh       |  81 ++++++++++++++++++
 bin/singa-run.sh           |  36 ++++----
 bin/singa-stop.sh          |   6 +-
 include/trainer/trainer.h  |   4 +-
 include/utils/cluster.h    |   8 +-
 include/utils/cluster_rt.h |  53 +++++++++---
 src/main.cc                |   9 +-
 src/trainer/trainer.cc     |  17 ++--
 src/utils/cluster.cc       |  28 ++-----
 src/utils/cluster_rt.cc    | 179 +++++++++++++++++++++++++++++++---------
 src/utils/tool.cc          |  71 ++++++++++++++--
 12 files changed, 378 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index be85b4f..9abefe5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,6 +17,8 @@
 *.pb.h
 *.pb.cc
 *.hosts
+*.id
+*.tmp
 *.out
 tool/pb2/*
 src/test/data/*

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/bin/singa-console.sh
----------------------------------------------------------------------
diff --git a/bin/singa-console.sh b/bin/singa-console.sh
new file mode 100755
index 0000000..8844bca
--- /dev/null
+++ b/bin/singa-console.sh
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Copyright 2015 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+# 
+# console to list/view/kill singa jobs
+#
+
+usage="Usage:\n
+       singa-console.sh list         :  list running singa jobs\n
+       singa-console.sh view JOB_ID  :  view procs of a singa job\n
+       singa-console.sh kill JOB_ID  :  kill a singa job"
+
+if [ $# == 0 ]; then
+  echo -e $usage
+  exit 1
+fi
+
+# get environment variables
+. `dirname "${BASH_SOURCE-$0}"`/singa-env.sh
+cd $SINGA_HOME
+
+case $1 in
+  list)
+    if [ $# != 1 ]; then
+      echo -e $usage
+      exit 1
+    fi
+    ./singatool list || exit 1
+    ;;
+
+  view)
+    if [ $# != 2 ]; then
+      echo -e $usage
+      exit 1
+    fi
+    ./singatool view $2 || exit 1
+    ;;
+
+  kill)
+    if [ $# != 2 ]; then
+      echo -e $usage
+      exit 1
+    fi
+    host_file="job-$2.tmp"
+    ./singatool view $2 1>$host_file || exit 1
+    ssh_options="-oStrictHostKeyChecking=no \
+             -oUserKnownHostsFile=/dev/null \
+             -oLogLevel=quiet"
+    hosts=`cat $host_file | cut -d ' ' -f 1`
+    for i in ${hosts[@]}; do
+      echo kill singa @ $i ...
+      proc=(`echo $i | tr '|' ' '`)
+      singa_kill="kill -9 "${proc[1]}
+      ssh $ssh_options ${proc[0]} $singa_kill
+    done
+    rm $host_file
+    ./singatool clean $2 || exit 1
+    ;;
+  
+  *)
+    echo -e $usage
+    exit 1
+esac

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/bin/singa-run.sh
----------------------------------------------------------------------
diff --git a/bin/singa-run.sh b/bin/singa-run.sh
index 2c282c3..a478221 100755
--- a/bin/singa-run.sh
+++ b/bin/singa-run.sh
@@ -23,12 +23,8 @@
 # run a Singa job
 #
 
-usage="Usage: singa-run.sh -conf=CONF_DIR 
-      (CONF_DIR should contain cluster.conf && model.conf)"
-# usage="Usage: \n
-#       (single process): singa-run.sh -cluster=YOUR_CONF_FILE -model=YOUR_CONF_FILE \n
-#       (multi-process): singa-run.sh -conf=YOUR_CONF_DIR 
-#       (the directory should contain cluster.conf && model.conf)"
+usage="Usage: singa-run.sh -conf=CONF_DIR
+       (CONF_DIR should contain cluster.conf && model.conf)"
 
 # check arguments
 if [ $# != 1 ] || [[ $1 != "-conf="* ]]; then
@@ -40,32 +36,40 @@ fi
 . `dirname "${BASH_SOURCE-$0}"`/singa-env.sh
 # get workspace path
 workspace=`cd "${1:6}">/dev/null; pwd`
+cluster_conf=$workspace/cluster.conf
+model_conf=$workspace/model.conf
+if [ ! -f $cluster_conf ] || [ ! -f $model_conf ]; then
+  echo cluster.conf or model.conf not exists in $workspace
+  exit 1
+fi
+cd $SINGA_HOME
 
 # start zookeeper
 if [ $SINGA_MANAGES_ZK = true ]; then
   $SINGA_BIN/zk-service.sh start || exit 1
 fi
 
-# cleanup old processes and data
-$SINGA_BIN/singa-stop.sh || exit 1
-
 # generate host file
 host_file=$workspace/job.hosts
-python $SINGA_HOME/tool/gen_hosts.py -conf=$workspace/cluster.conf \
+python $SINGA_HOME/tool/gen_hosts.py -conf=$cluster_conf \
                                      -hosts=$SINGA_CONF/hostfile \
                                      -output=$host_file \
                                      || exit 1
 
+# generate unique job id
+./singatool create 1>$workspace/job.id || exit 1
+job_id=`cat $workspace/job.id`
+echo generate job id at $workspace/job.id [job_id = $job_id]
+
 # ssh and start singa processes
 ssh_options="-oStrictHostKeyChecking=no \
 -oUserKnownHostsFile=/dev/null \
 -oLogLevel=quiet"
-hosts=`cat $host_file |cut -d ' ' -f 1`
-# cd to SINGA_HOME as it need conf/singa.conf
-cd $SINGA_HOME
-singa_run="./singa -cluster=$workspace/cluster.conf -model=$workspace/model.conf"
-singa_sshrun="cd $SINGA_HOME; ./singa -cluster=$workspace/cluster.conf \
-              -model=$workspace/model.conf"
+hosts=`cat $host_file | cut -d ' ' -f 1`
+singa_run="./singa -cluster=$cluster_conf -model=$model_conf \
+           -job=$job_id"
+singa_sshrun="cd $SINGA_HOME; $singa_run"
+
 for i in ${hosts[@]} ; do
   if [ $i = localhost ] ; then
     echo executing : $singa_run

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/bin/singa-stop.sh
----------------------------------------------------------------------
diff --git a/bin/singa-stop.sh b/bin/singa-stop.sh
index e3f24dc..dd3e7bf 100755
--- a/bin/singa-stop.sh
+++ b/bin/singa-stop.sh
@@ -34,13 +34,14 @@
 
 # get environment variables
 . `dirname "${BASH_SOURCE-$0}"`/singa-env.sh
+cd $SINGA_HOME
 
 # kill singa processes
 host_file=$SINGA_CONF/hostfile
 ssh_options="-oStrictHostKeyChecking=no \
              -oUserKnownHostsFile=/dev/null \
              -oLogLevel=quiet"
-hosts=`cat $host_file |cut -d ' ' -f 1`
+hosts=`cat $host_file | cut -d ' ' -f 1`
 singa_kill="killall -s SIGKILL -r singa"
 for i in ${hosts[@]}; do
   echo kill singa @ $i ...
@@ -56,5 +57,4 @@ sleep 2
 # remove zk data
 # singatool need global conf under SINGA_HOME
 echo cleanning metadata in zookeeper ...
-cd $SINGA_HOME
-./singatool || exit 1
+./singatool cleanup || exit 1

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index 6c2b7c6..9f47ccd 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -34,8 +34,8 @@ class Trainer{
    * @param globalconf global singa configuration
    * @param cconf cluster configuration
    */
-  void Start(bool resume, int job, ModelProto& mconf,
-    const GlobalProto& gconf, const ClusterProto& cconf);
+  void Start(ModelProto& mconf, const GlobalProto& gconf,
+             const ClusterProto& cconf, int job, bool resume);
 
  protected:
   /**

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 570377f..b48c1c5 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -24,8 +24,8 @@ namespace singa {
 class Cluster {
  public:
   static shared_ptr<Cluster> Get();
-  static shared_ptr<Cluster> Get(const GlobalProto& global,
-                                 const ClusterProto& cluster, int procs_id=0);
+  static shared_ptr<Cluster> Get(const GlobalProto& global, 
+                                 const ClusterProto& cluster, int job_id);
 
   const int nserver_groups()const{ return cluster_.nserver_groups(); }
   const int nworker_groups()const { return cluster_.nworker_groups(); }
@@ -125,10 +125,10 @@ class Cluster {
   const string hostip() const {
     return hostip_;
   }
-  void Register(const string& endpoint);
+  void Register(const string& endpoint, int pid);
 
  private:
-  Cluster(const GlobalProto& global, const ClusterProto &cluster, int procs_id) ;
+  Cluster(const GlobalProto& global, const ClusterProto &cluster, int job_id);
   void SetupFolders(const ClusterProto &cluster);
   int Hash(int gid, int id, int flag);
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index 55ca243..ab95adf 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -27,11 +27,11 @@ class ClusterRuntime {
    *
    * \return the process id, -1 if failed
    */
-  virtual int RegistProc(const std::string& host_addr) = 0;
+  virtual int RegistProc(const std::string& host_addr, int pid) = 0;
   /**
    * translate the process id to host address
    *
-   * \return the host and port, "" if no such proc id
+   * \return the host and port, "" if no such proc id 
    */
   virtual std::string GetProcHost(int proc_id) = 0;
   /**
@@ -49,18 +49,35 @@ class ClusterRuntime {
   virtual bool LeaveSGroup(int gid, int wid, int s_group) = 0;
 };
 
+const int kZKBufSize = 100;
+// following paths are global
 const std::string kZKPathSinga = "/singa";
-const std::string kZKPathStatus = "/singa/status";
-const std::string kZKPathRegist = "/singa/regist";
-const std::string kZKPathRegistProc = "/singa/regist/proc";
-const std::string kZKPathRegistLock = "/singa/regist/lock";
-const int kZKBufSize = 50;
+const std::string kZKPathSys =   "/singa/sys";
+const std::string kZKPathJLock = "/singa/sys/job-lock";
+const std::string kZKPathApp =   "/singa/app";
+const std::string kZKPathJob =   "/singa/app/job-";
+// following paths are local under /singa/app/job-X
+const std::string kZKPathJobGroup = "/group";
+const std::string kZKPathJobProc =  "/proc";
+const std::string kZKPathJobPLock = "/proc-lock";
+
+inline std::string GetZKJobWorkspace(int job_id) {
+  char buf[kZKBufSize];
+  sprintf(buf, "%010d", job_id);
+  return kZKPathJob + buf;
+}
 
 struct RTCallback {
   rt_callback fn;
   void* ctx;
 };
 
+struct JobInfo {
+  int id;
+  int procs;
+  std::string name;
+};
+
 class ZKService {
  public:
   static void ChildChanges(zhandle_t* zh, int type, int state,
@@ -87,12 +104,12 @@ class ZKService {
 
 class ZKClusterRT : public ClusterRuntime {
  public:
-  explicit ZKClusterRT(const std::string& host);
-  ZKClusterRT(const std::string& host, int timeout);
+  ZKClusterRT(const std::string& host, int job_id);
+  ZKClusterRT(const std::string& host, int job_id, int timeout);
   ~ZKClusterRT() override;
 
   bool Init() override;
-  int RegistProc(const std::string& host_addr) override;
+  int RegistProc(const std::string& host_addr, int pid) override;
   std::string GetProcHost(int proc_id) override;
   bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) override;
   bool JoinSGroup(int gid, int wid, int s_group) override;
@@ -100,7 +117,7 @@ class ZKClusterRT : public ClusterRuntime {
 
  private:
   inline std::string groupPath(int gid) {
-    return kZKPathStatus + "/sg" + std::to_string(gid);
+    return group_path_ + "/sg" + std::to_string(gid);
   }
   inline std::string workerPath(int gid, int wid) {
     return "/g" + std::to_string(gid) + "_w" + std::to_string(wid);
@@ -109,6 +126,10 @@ class ZKClusterRT : public ClusterRuntime {
   int timeout_ = 30000;
   std::string host_ = "";
   ZKService zk_;
+  std::string workspace_ = "";
+  std::string group_path_ = "";
+  std::string proc_path_ = "";
+  std::string proc_lock_path_ = ""; 
   std::vector<RTCallback*> cb_vec_;
 };
 
@@ -118,10 +139,16 @@ class JobManager {
   JobManager(const std::string& host, int timeout);
 
   bool Init();
-  bool Clean();
+  int GenerateJobID();
+  bool ListJobs(std::vector<JobInfo>* jobs);
+  bool ListJobProcs(int job, std::vector<std::string>* procs);
+  bool Clean(int job);
+  bool Cleanup();
 
  private:
-  bool CleanPath(const std::string& path);
+  const int kJobsNotRemoved = 10;
+
+  bool CleanPath(const std::string& path, bool remove);
 
   int timeout_ = 30000;
   std::string host_ = "";

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/src/main.cc
----------------------------------------------------------------------
diff --git a/src/main.cc b/src/main.cc
index a72a2db..cda1294 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -18,10 +18,9 @@
  * easily, e.g., AddLayer(layer_type, source_layers, meta_data).
  */
 
-// Job ID is not used now, TODO passing job id from singa-run script and
-// re-organize ClusterProto, GlobalProto and ModelProto.
-DEFINE_int32(job, -1, "Job ID");  // not used now
-DEFINE_bool(resume, false, "resume from checkpoint");
+// TODO: re-organize ClusterProto, GlobalProto and ModelProto.
+DEFINE_int32(job, -1, "Unique job ID");
+DEFINE_bool(resume, false, "Resume from checkpoint");
 DEFINE_string(cluster, "examples/mnist/cluster.conf", "Cluster config file");
 DEFINE_string(model, "examples/mnist/conv.conf", "Model config file");
 DEFINE_string(global, "conf/singa.conf", "Global config file");
@@ -54,6 +53,6 @@ int main(int argc, char **argv) {
 
   RegisterClasses(model);
   singa::Trainer trainer;
-  trainer.Start(FLAGS_resume, FLAGS_job, model, global, cluster);
+  trainer.Start(model, global, cluster, FLAGS_job, FLAGS_resume);
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 5d43e19..78ec49f 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -4,6 +4,7 @@
 #include <chrono>
 #include <glog/logging.h>
 #include "utils/tinydir.h"
+#include <unistd.h>
 #include "utils/cluster.h"
 #include "utils/common.h"
 #include "proto/common.pb.h"
@@ -229,8 +230,8 @@ void Trainer::Resume(ModelProto& mconf) {
   tinydir_close(&dir);
 }
 
-void Trainer::Start(bool resume, int job, ModelProto& mconf,
-    const GlobalProto& gconf, const ClusterProto& cconf) {
+void Trainer::Start(ModelProto& mconf, const GlobalProto& gconf,
+                    const ClusterProto& cconf, int job, bool resume){
   // register job to zookeeper at the beginning
   auto cluster=Cluster::Get(gconf, cconf, job);
 
@@ -240,14 +241,10 @@ void Trainer::Start(bool resume, int job, ModelProto& mconf,
 
   router_ = new Router();
   router_->Bind(kInprocRouterEndpoint);
-  if (cluster->nprocs() > 1) {
-    const string hostip = cluster->hostip();
-    int port = router_->Bind("tcp://" + hostip + ":*");
-    // register endpoint to zookeeper
-    cluster->Register(hostip + ":" + std::to_string(port));
-  } else {
-    cluster->set_procs_id(0);
-  }
+  const string hostip = cluster->hostip();
+  int port = router_->Bind("tcp://" + hostip + ":*");
+  // register endpoint to zookeeper
+  cluster->Register(hostip + ":" + std::to_string(port), getpid());
 
   int nthreads = 1;
   const vector<Worker*> workers = CreateWorkers(nthreads, mconf);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index 3019f14..791332d 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -11,8 +11,7 @@ namespace singa {
 
 std::shared_ptr<Cluster> Cluster::instance_;
 Cluster::Cluster(const GlobalProto & global, const ClusterProto &cluster,
-                int procs_id) {
-  procs_id_=procs_id;
+                 int job_id) {
   cluster_ = cluster;
   global_ = global;
   SetupFolders(cluster);
@@ -20,16 +19,6 @@ Cluster::Cluster(const GlobalProto & global, const ClusterProto &cluster,
     nprocs_=nworker_procs()+nserver_procs();
   else
     nprocs_=std::max(nworker_procs(), nserver_procs());
-  CHECK_LT(procs_id, nprocs_);
-  if(nprocs_>1&&procs_id>-1){
-    std::ifstream ifs(cluster.hostfile(), std::ifstream::in);
-    std::string line;
-    while(std::getline(ifs, line)&&
-        endpoints_.size()< static_cast<size_t>(nprocs_)){
-      endpoints_.push_back(line);
-    }
-    CHECK_EQ(endpoints_.size(), nprocs_);
-  }
 
   // locate the process id of every worker/server
   int ngrps=cluster_.nworker_groups(), grp_size=cluster_.nworkers_per_group();
@@ -49,18 +38,19 @@ Cluster::Cluster(const GlobalProto & global, const ClusterProto &cluster,
     }
   }
 
-  auto rt=new ZKClusterRT(global_.zookeeper_host());
+  auto rt = new ZKClusterRT(global_.zookeeper_host(), job_id);
   rt->Init();
   cluster_rt_=shared_ptr<ClusterRuntime>(static_cast<ClusterRuntime*>(rt));
 
   hostip_=GetHostIP();
 }
 
-void Cluster::Register(const string& endpoint) {
-  procs_id_=cluster_rt_->RegistProc(endpoint);
+void Cluster::Register(const string& endpoint, int pid) {
+  procs_id_=cluster_rt_->RegistProc(endpoint, pid);
   CHECK_GE(procs_id_,0);
   CHECK_LT(procs_id_,nprocs());
-  LOG(ERROR) << "proc #" << procs_id_ << " -> " << endpoint;
+  LOG(ERROR) << "proc #" << procs_id_ << " -> " << endpoint
+             << " (pid = " << pid << ")";
 }
 
 const string Cluster::endpoint(int procsid) const {
@@ -79,9 +69,9 @@ void Cluster::SetupFolders(const ClusterProto &cluster){
   mkdir(checkpoint_folder().c_str(),  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
 }
 
-shared_ptr<Cluster> Cluster::Get(const GlobalProto& global, const ClusterProto&
cluster,
-                                 int procs_id){
-  instance_.reset(new Cluster(global, cluster, procs_id));
+shared_ptr<Cluster> Cluster::Get(const GlobalProto& global,
+                                 const ClusterProto& cluster, int job_id){
+  instance_.reset(new Cluster(global, cluster, job_id));
   return instance_;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 7e5a86b..c722a73 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -5,6 +5,7 @@
 
 using std::string;
 using std::to_string;
+using std::vector;
 
 namespace singa {
 
@@ -25,11 +26,11 @@ void ZKService::ChildChanges(zhandle_t *zh, int type, int state,
         cb->fn = nullptr;
       }
     } else {
-      LOG(ERROR) << "Unhandled ZK error code: " << ret
-                 << " (zoo_wget_children)";
+      LOG(FATAL) << "Unhandled ZK error code: " << ret
+                 << " (zoo_wget_children " << path << ")";
     }
   } else {
-    LOG(ERROR) << "Unhandled callback type code: "<< type;
+    LOG(FATAL) << "Unhandled callback type code: "<< type;
   }
 }
 
@@ -84,7 +85,8 @@ bool ZKService::CreateNode(const char* path, const char* val, int flag,
     LOG(WARNING) << "zookeeper node " << path << " already exists";
     return true;
   }
-  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+             << " (zoo_create " << path << ")";
   return false;
 }
 
@@ -97,7 +99,8 @@ bool ZKService::DeleteNode(const char* path) {
     LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
     return true;
   }
-  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_delete)";
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+             << " (zoo_delete " << path << ")";
   return false;
 }
 
@@ -106,7 +109,7 @@ bool ZKService::Exist(const char* path) {
   int ret = zoo_exists(zkhandle_, path, 0, &stat);
   if (ret == ZOK) return true;
   else if (ret == ZNONODE) return false;
-  //LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
+  LOG(WARNING) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
   return false;
 }
 
@@ -121,30 +124,35 @@ bool ZKService::GetNode(const char* path, char* output) {
     LOG(ERROR) << "zk node " << path << " does not exist";
     return false;
   }
-  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get)";
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+            << " (zoo_get " << path << ")";
   return false;
 }
 
-bool ZKService::GetChild(const char* path, std::vector<string>* vt) {
+bool ZKService::GetChild(const char* path, vector<string>* vt) {
   struct String_vector child;
   int ret = zoo_get_children(zkhandle_, path, 0, &child);
   if (ret == ZOK) {
+    vt->clear();
     for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
     return true;
   }
-  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get_children)";
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+             << " (zoo_get_children " << path << ")";
   return false;
 }
 
-bool ZKService::WGetChild(const char* path, std::vector<std::string>* vt,
+bool ZKService::WGetChild(const char* path, vector<string>* vt,
                             RTCallback *cb) {
   struct String_vector child;
   int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child);
   if (ret == ZOK) {
+    vt->clear();
     for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
     return true;
   }
-  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get_children)";
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+             << " (zoo_get_children " << path << ")";
   return false;
 }
 
@@ -159,11 +167,15 @@ void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state,
   }
 }
 
-ZKClusterRT::ZKClusterRT(const string& host) : ZKClusterRT(host, 30000) {}
+ZKClusterRT::ZKClusterRT(const string& host, int job_id) : ZKClusterRT(host, job_id,
30000) {}
 
-ZKClusterRT::ZKClusterRT(const string& host, int timeout) {
+ZKClusterRT::ZKClusterRT(const string& host, int job_id, int timeout) {
   host_ = host;
   timeout_ = timeout;
+  workspace_ = GetZKJobWorkspace(job_id);
+  group_path_ = workspace_ + kZKPathJobGroup;
+  proc_path_ = workspace_ + kZKPathJobProc;
+  proc_lock_path_ = workspace_ + kZKPathJobPLock;
 }
 
 ZKClusterRT::~ZKClusterRT() {
@@ -175,41 +187,38 @@ ZKClusterRT::~ZKClusterRT() {
 
 bool ZKClusterRT::Init() {
   if (!zk_.Init(host_, timeout_)) return false;
-  // create kZKPathSinga
   if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
     return false;
-  // create kZKPathStatus
-  if (!zk_.CreateNode(kZKPathStatus.c_str(), nullptr, 0, nullptr))
+  if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
     return false;
-  // create kZKPathRegist
-  if (!zk_.CreateNode(kZKPathRegist.c_str(), nullptr, 0, nullptr))
+  if (!zk_.CreateNode(workspace_.c_str(), nullptr, 0, nullptr))
     return false;
-  // create kZKPathRegistProc
-  if (!zk_.CreateNode(kZKPathRegistProc.c_str(), nullptr, 0, nullptr))
+  if (!zk_.CreateNode(group_path_.c_str(), nullptr, 0, nullptr))
     return false;
-  // create kZKPathRegistLock
-  if (!zk_.CreateNode(kZKPathRegistLock.c_str(), nullptr, 0, nullptr))
+  if (!zk_.CreateNode(proc_path_.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(proc_lock_path_.c_str(), nullptr, 0, nullptr))
     return false;
   return true;
 }
 
-int ZKClusterRT::RegistProc(const string& host_addr) {
+int ZKClusterRT::RegistProc(const string& host_addr, int pid) {
   char buf[kZKBufSize];
-  string lock = kZKPathRegistLock+"/lock-";
+  string lock = proc_lock_path_ + "/lock-";
   if (!zk_.CreateNode(lock.c_str(), nullptr,
                         ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
     return -1;
   }
   // get all children in lock folder
-  std::vector<string> vt;
-  if (!zk_.GetChild(kZKPathRegistLock.c_str(), &vt)) {
+  vector<string> vt;
+  if (!zk_.GetChild(proc_lock_path_.c_str(), &vt)) {
     return -1;
   }
   // find own position among all locks
   int id = -1;
   std::sort(vt.begin(), vt.end());
   for (int i = 0; i < static_cast<int>(vt.size()); ++i) {
-    if (kZKPathRegistLock+"/"+vt[i] == buf) {
+    if (proc_lock_path_+"/"+vt[i] == buf) {
       id = i;
       break;
     }
@@ -219,8 +228,9 @@ int ZKClusterRT::RegistProc(const string& host_addr) {
     return -1;
   }
   // create a new node in proc path
-  string path = kZKPathRegistProc+"/proc-"+to_string(id);
-  if (!zk_.CreateNode(path.c_str(), host_addr.c_str(), ZOO_EPHEMERAL,
+  string path = proc_path_ + "/proc-" + to_string(id);
+  string content = host_addr + "|" + to_string(pid);
+  if (!zk_.CreateNode(path.c_str(), content.c_str(), ZOO_EPHEMERAL,
                       nullptr)) {
     return -1;
   }
@@ -232,7 +242,7 @@ bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx)
{
   string path = groupPath(gid);
   // create zk node
   if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false;
-  std::vector<string> child;
+  vector<string> child;
   // store the callback function and context for later usage
   RTCallback *cb = new RTCallback;
   cb->fn = fn;
@@ -242,12 +252,16 @@ bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void
*ctx) {
   return zk_.WGetChild(path.c_str(), &child, cb);
 }
 
-string ZKClusterRT::GetProcHost(int proc_id) {
+std::string ZKClusterRT::GetProcHost(int proc_id) {
   // char buf[kZKBufSize];
   char val[kZKBufSize];
   // construct file name
-  string path = kZKPathRegistProc+"/proc-"+to_string(proc_id);
+  string path = proc_path_+"/proc-"+to_string(proc_id);
   if (!zk_.GetNode(path.c_str(), val)) return "";
+  int len = strlen(val)-1;
+  while (len && val[len] != '|') --len;
+  CHECK(len);
+  val[len] = '\0';
   return string(val);
 }
 
@@ -270,23 +284,110 @@ JobManager::JobManager(const string& host, int timeout) {
 }
 
 bool JobManager::Init() {
-  return zk_.Init(host_, timeout_);
+  if (!zk_.Init(host_, timeout_)) return false;
+  if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(kZKPathSys.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(kZKPathJLock.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
+    return false;
+  return true;
+}
+
+int JobManager::GenerateJobID() {
+  char buf[kZKBufSize];
+  string lock = kZKPathJLock + "/lock-";
+  if (!zk_.CreateNode(lock.c_str(), nullptr,
+                        ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
+    return -1;
+  }
+  return atoi(buf+strlen(buf)-10);
 }
 
-bool JobManager::Clean() {
+bool JobManager::ListJobProcs(int job, vector<string>* procs) {
+  procs->clear();
+  string job_path = GetZKJobWorkspace(job);
+  // check job path
+  if (!zk_.Exist(job_path.c_str())) {
+    LOG(ERROR) << "job " << job << " not exists";
+    return true;
+  }
+  string proc_path = job_path + kZKPathJobProc;
+  vector<string> vt;
+  // check job proc path
+  if (!zk_.GetChild(proc_path.c_str(), &vt)) {
+    return false;
+  }
+  char buf[singa::kZKBufSize];
+  for (string pname : vt){
+    pname = proc_path + "/" + pname;
+    if (!zk_.GetNode(pname.c_str(), buf)) continue;
+    std::string proc = "";
+    for (int i = 0; buf[i] != '\0'; ++i) {
+      if (buf[i] == ':') {
+        buf[i] = '\0';
+        proc += buf; 
+      }
+      else if (buf[i] == '|') {
+        proc += buf + i;
+      }
+    }
+    procs->push_back(proc);
+  }
+  if (!procs->size()) LOG(ERROR) << "job " << job << " not exists";
+  return true;
+}
+
+bool JobManager::ListJobs(vector<JobInfo>* jobs) {
+  // get all children in app path
+  jobs->clear();
+  vector<string> vt;
+  if (!zk_.GetChild(kZKPathApp.c_str(), &vt)) {
+    return false;
+  }
+  std::sort(vt.begin(), vt.end());
+  int size = static_cast<int>(vt.size());
+  vector<string> procs;
+  for (int i = 0; i < size; ++i) {
+    string path = kZKPathApp + "/" + vt[i] + kZKPathJobProc;
+    if (!zk_.GetChild(path.c_str(), &procs)) continue;
+    JobInfo job;
+    string jid = vt[i].substr(vt[i].length()-10);
+    job.id = atoi(jid.c_str());
+    job.procs = procs.size();
+    jobs->push_back(job);
+    //may need to delete it
+    if (!job.procs && (i + kJobsNotRemoved < size))
+        CleanPath(kZKPathApp + "/" + vt[i], true);
+  }
+  return true;
+}
+
+bool JobManager::Clean(int job) {
+  string path = GetZKJobWorkspace(job) + kZKPathJobProc;
+  if (zk_.Exist(path.c_str())) {
+    return CleanPath(path.c_str(), false);
+  }
+  return true;
+}
+
+bool JobManager::Cleanup() {
   if (zk_.Exist(kZKPathSinga.c_str())) {
-    return CleanPath(kZKPathSinga.c_str());
+    return CleanPath(kZKPathSinga.c_str(), true);
   }
   return true;
 }
 
-bool JobManager::CleanPath(const std::string& path) {
-  std::vector<string> child;
+bool JobManager::CleanPath(const string& path, bool remove) {
+  vector<string> child;
   if (!zk_.GetChild(path.c_str(), &child)) return false;
   for (string c : child) {
-    if (!CleanPath(path + "/" + c)) return false;
+    if (!CleanPath(path + "/" + c, true)) return false;
   }
-  return zk_.DeleteNode(path.c_str());
+  if (remove) return zk_.DeleteNode(path.c_str());
+  return true;
 }
 
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/src/utils/tool.cc
----------------------------------------------------------------------
diff --git a/src/utils/tool.cc b/src/utils/tool.cc
index 7309108..fc9f618 100644
--- a/src/utils/tool.cc
+++ b/src/utils/tool.cc
@@ -1,5 +1,7 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <iostream>
+#include <fstream>
 #include "proto/cluster.pb.h"
 #include "utils/cluster_rt.h"
 #include "utils/common.h"
@@ -20,9 +22,68 @@ int main(int argc, char **argv) {
   LOG(INFO) << "The global config is \n" << global.DebugString();
 
   singa::JobManager mng(global.zookeeper_host());
-  int ret = 0;
-  if (!mng.Init()) ret = 1;
-  if (!mng.Clean()) ret = 1;
-  if (ret) LOG(ERROR) << "errors in SingaTool!";
-  return ret;
+  std::string usage = "singatool usage:\n"
+      "./singatool create       :  generate a unique job id\n"
+      "./singatool list         :  list running singa jobs\n"
+      "./singatool view JOB_ID  :  view procs of a singa job\n"
+      "./singatool clean JOB_ID :  clean a job path in zookeeper\n"
+      "./singatool cleanup      :  clean all singa data in zookeeper\n"
+      "./singatool listall      :  list all singa jobs\n";
+  if (argc <= 1) {
+    LOG(ERROR) << usage;
+    return 1;
+  }
+  if (!mng.Init()) return 1; 
+  if (!strcmp(argv[1], "create")) {
+    int id = mng.GenerateJobID();
+    printf("%d\n", id);
+  }
+  else if (!strcmp(argv[1], "list")) {
+    std::vector<singa::JobInfo> jobs;
+    if (!mng.ListJobs(&jobs)) return 1;
+    printf("JOB ID    |NUM PROCS  \n");
+    printf("----------|-----------\n");
+    for (singa::JobInfo job : jobs) {
+      if (!job.procs) continue;
+      printf("job-%-6d|%-10d\n", job.id, job.procs);
+    }
+  }
+  else if (!strcmp(argv[1], "listall")) {
+    std::vector<singa::JobInfo> jobs;
+    if (!mng.ListJobs(&jobs)) return 1;
+    printf("JOB ID    |NUM PROCS  \n");
+    printf("----------|-----------\n");
+    for (singa::JobInfo job : jobs) {
+      printf("job-%-6d|%-10d\n", job.id, job.procs);
+    }
+  }
+  else if (!strcmp(argv[1], "view")) {
+    if (argc <= 2) {
+      LOG(ERROR) << usage;
+      return 1;
+    }
+    int id = atoi(argv[2]);
+    std::vector<std::string> procs;
+    if (!mng.ListJobProcs(id, &procs)) return 1;
+    for (std::string s : procs) {
+      printf("%s\n", s.c_str());
+    }
+  }
+  else if (!strcmp(argv[1], "clean")) {
+    if (argc <= 2) {
+      LOG(ERROR) << usage;
+      return 1;
+    }
+    int id = atoi(argv[2]);
+    if (!mng.Clean(id)) return 1;
+  }
+  else if (!strcmp(argv[1], "cleanup")) {
+    if (!mng.Cleanup()) return 1;
+  }
+  else{
+    LOG(ERROR) << usage;
+    return 1;
+  }
+  
+  return 0;
 }


Mime
View raw message