singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wang...@apache.org
Subject [1/2] incubator-singa git commit: SINGA-155 Remove zookeeper for single-process training
Date Sat, 02 Apr 2016 03:46:34 GMT
Repository: incubator-singa
Updated Branches:
  refs/heads/master b00dc32fb -> 0233049ce


SINGA-155 Remove zookeeper for single-process training

add single-process cluster-runtime implementation.
rearrange functions in cluster_rt.h into:
  * cluster_rt.h - cluster-runtime interface and SPClusterRT impl
  * zk_service.h - zookeeper wrapper and ZKClusterRT impl
  * job_manager.h - job manager


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/adeced6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/adeced6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/adeced6b

Branch: refs/heads/master
Commit: adeced6b5ff0637ae0e2bf0817514598ae7b6f1c
Parents: b00dc32
Author: WANG Sheng <wangsheng1001@gmail.com>
Authored: Thu Mar 31 20:06:12 2016 +0800
Committer: WANG Sheng <wangsheng1001@gmail.com>
Committed: Fri Apr 1 12:59:52 2016 +0800

----------------------------------------------------------------------
 Makefile.am                       |   7 +-
 include/singa/utils/cluster_rt.h  | 136 ++-------
 include/singa/utils/job_manager.h |  79 +++++
 include/singa/utils/zk_service.h  | 116 ++++++++
 src/utils/cluster.cc              |   3 +-
 src/utils/cluster_rt.cc           | 511 +++------------------------------
 src/utils/job_manager.cc          | 271 +++++++++++++++++
 src/utils/tool.cc                 |   2 +-
 src/utils/zk_service.cc           | 326 +++++++++++++++++++++
 9 files changed, 860 insertions(+), 591 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/Makefile.am
----------------------------------------------------------------------
diff --git a/Makefile.am b/Makefile.am
index b2587a4..f185a9d 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -100,7 +100,10 @@ SINGA_SRCS := src/driver.cc \
               src/utils/param.cc \
               src/utils/updater.cc \
               src/utils/blob.cc \
-              src/utils/image_transform.cc
+              src/utils/image_transform.cc \
+              src/utils/job_manager.cc \
+              src/utils/zk_service.cc 
+
 
 SINGA_HDRS := include/singa.h \
 			  include/singa/utils/math_blob.h \
@@ -118,6 +121,8 @@ SINGA_HDRS := include/singa.h \
               include/utils/tinydir.h \
               include/utils/tokenizer.h \
               include/utils/image_transform.h \
+              include/utils/job_manager.cc \
+              include/utils/zk_service.cc \
               include/server.h \
               include/worker.h \
               include/stub.h \

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/include/singa/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/singa/utils/cluster_rt.h b/include/singa/utils/cluster_rt.h
index 5de6c16..e0125be 100644
--- a/include/singa/utils/cluster_rt.h
+++ b/include/singa/utils/cluster_rt.h
@@ -22,7 +22,8 @@
 #ifndef SINGA_UTILS_CLUSTER_RT_H_
 #define SINGA_UTILS_CLUSTER_RT_H_
 
-#include <zookeeper/zookeeper.h>
+#include <map>
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -30,65 +31,11 @@ namespace singa {
 
 typedef void (*rt_callback)(void *contest);
 
-const int kZKBufSize = 100;
-// following paths are global
-const std::string kZKPathSinga = "/singa";
-const std::string kZKPathSys =   "/singa/sys";
-const std::string kZKPathJLock = "/singa/sys/job-lock";
-const std::string kZKPathHostIdx = "/singa/sys/host-idx";
-const std::string kZKPathApp =   "/singa/app";
-const std::string kZKPathJob =   "/singa/app/job-";
-// following paths are local under /singa/app/job-X
-const std::string kZKPathJobGroup = "/group";
-const std::string kZKPathJobProc =  "/proc";
-const std::string kZKPathJobPLock = "/proc-lock";
-
-inline std::string GetZKJobWorkspace(int job_id) {
-  char buf[kZKBufSize];
-  snprintf(buf, kZKBufSize, "%010d", job_id);
-  return kZKPathJob + buf;
-}
-
 struct RTCallback {
   rt_callback fn;
   void* ctx;
 };
 
-struct JobInfo {
-  int id;
-  int procs;
-  std::string name;
-};
-
-/*
- * A wrapper for zookeeper service which handles error code and reconnections
- */
-class ZKService {
- public:
-  static void ChildChanges(zhandle_t* zh, int type, int state,
-                           const char *path, void* watcherCtx);
-
-  ~ZKService();
-  bool Init(const std::string& host, int timeout);
-  bool CreateNode(const char* path, const char* val, int flag, char* output);
-  bool DeleteNode(const char* path);
-  bool Exist(const char* path);
-  bool UpdateNode(const char* path, const char* val);
-  bool GetNode(const char* path, char* output);
-  bool GetChild(const char* path, std::vector<std::string>* vt);
-  bool WGetChild(const char* path, std::vector<std::string>* vt,
-                   RTCallback *cb);
-
- private:
-  const int kNumRetry = 5;
-  const int kSleepSec = 1;
-
-  static void WatcherGlobal(zhandle_t* zh, int type, int state,
-                            const char *path, void* watcherCtx);
-
-  zhandle_t* zkhandle_ = nullptr;
-};
-
 /**
  * ClusterRuntime is a runtime service that manages dynamic configuration
  * and status of the whole cluster. It mainly provides following services:
@@ -97,92 +44,57 @@ class ZKService {
  */
 class ClusterRuntime {
  public:
-  ClusterRuntime(const std::string& host, int job_id);
-  ClusterRuntime(const std::string& host, int job_id, int timeout);
-  ~ClusterRuntime();
-
+  virtual ~ClusterRuntime() {}
   /**
    * Initialize the runtime instance
    */
-  bool Init();
+  virtual bool Init() = 0;
   /**
    * register the process, and get a unique process id
    *
    * \return the process id, -1 if failed
    */
-  int RegistProc(const std::string& host_addr, int pid);
+  virtual int RegistProc(const std::string& host_addr, int pid) = 0;
   /**
    * translate the process id to host address
    *
    * \return the host and port, "" if no such proc id 
    */
-  std::string GetProcHost(int proc_id);
+  virtual std::string GetProcHost(int proc_id) = 0;
   /**
    * Server: watch all workers in a server group,
    * will be notified when all workers have left
    */
-  bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx);
+  virtual bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) = 0;
   /**
    * Worker: join a server group (i.e. start to read/update these servers)
    */
-  bool JoinSGroup(int gid, int wid, int s_group);
+  virtual bool JoinSGroup(int gid, int wid, int s_group) = 0;
   /**
    * Worker: leave a server group (i.e. finish its all work)
    */
-  bool LeaveSGroup(int gid, int wid, int s_group);
-
- private:
-  inline std::string groupPath(int gid) {
-    return group_path_ + "/sg" + std::to_string(gid);
-  }
-  inline std::string workerPath(int gid, int wid) {
-    return "/g" + std::to_string(gid) + "_w" + std::to_string(wid);
-  }
-
-  int timeout_ = 30000;
-  std::string host_ = "";
-  ZKService zk_;
-  std::string workspace_ = "";
-  std::string group_path_ = "";
-  std::string proc_path_ = "";
-  std::string proc_lock_path_ = "";
-  std::vector<RTCallback*> cb_vec_;
+  virtual bool LeaveSGroup(int gid, int wid, int s_group) = 0;
 };
 
-class JobManager {
+/*
+ * A ClusterRuntime implementation for single-process environment
+ */
+class SPClusterRT : public ClusterRuntime {
  public:
-  // host is comma separated host:port pairs, each corresponding to a zk server.
-  // e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-  explicit JobManager(const std::string& host);
-  JobManager(const std::string& host, int timeout);
+  ~SPClusterRT();
 
-  // NOTICE: Init must be called once, before start to use other functions
-  bool Init();
-  // generate a unique job id
-  bool GenerateJobID(int* id);
-  // generate a list of hosts for a job conf
-  bool GenerateHostList(const char* host_file, const char* job_file,
-                        std::vector<std::string>* list);
-  // list all jobs recorded in zk
-  bool ListJobs(std::vector<JobInfo>* jobs);
-  // list running processes for a job
-  bool ListJobProcs(int job, std::vector<std::string>* procs);
-  // remove a job path in zk
-  bool Remove(int job);
-  // remove all job paths in zk
-  bool RemoveAllJobs();
-  // remove all singa related paths in zk
-  bool CleanUp();
+  bool Init() override;
+  int RegistProc(const std::string& host_addr, int pid) override;
+  std::string GetProcHost(int proc_id) override;
+  bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) override;
+  bool JoinSGroup(int gid, int wid, int s_group) override;
+  bool LeaveSGroup(int gid, int wid, int s_group) override;
 
  private:
-  const int kJobsNotRemoved = 10;
-
-  bool CleanPath(const std::string& path, bool remove);
-  std::string ExtractClusterConf(const char* job_file);
-
-  int timeout_ = 30000;
-  std::string host_ = "";
-  ZKService zk_;
+  std::vector<std::string> proc_list_;
+  std::map<int, std::vector<RTCallback*>> grp_callbacks_;
+  std::map<int, int> grp_count_;
+  std::mutex lock_;
 };
 
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/include/singa/utils/job_manager.h
----------------------------------------------------------------------
diff --git a/include/singa/utils/job_manager.h b/include/singa/utils/job_manager.h
new file mode 100644
index 0000000..7f1b4f1
--- /dev/null
+++ b/include/singa/utils/job_manager.h
@@ -0,0 +1,79 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+* 
+*   http://www.apache.org/licenses/LICENSE-2.0
+* 
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#ifndef SINGA_UTILS_JOB_MANAGER_H_
+#define SINGA_UTILS_JOB_MANAGER_H_
+
+#include <string>
+#include <vector>
+
+#ifdef USE_ZOOKEEPER
+#include "singa/utils/zk_service.h"
+#endif
+
+namespace singa {
+
+struct JobInfo {
+  int id;
+  int procs;
+  std::string name;
+};
+
+class JobManager {
+ public:
+  // host is comma separated host:port pairs, each corresponding to a zk server.
+  // e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+  explicit JobManager(const std::string& host);
+
+  // NOTICE: Init must be called once, before start to use other functions
+  bool Init();
+  // generate a unique job id
+  bool GenerateJobID(int* id);
+  // generate a list of hosts for a job conf
+  bool GenerateHostList(const char* host_file, const char* job_file,
+                        std::vector<std::string>* list);
+  // list all jobs recorded in zk
+  bool ListJobs(std::vector<JobInfo>* jobs);
+  // list running processes for a job
+  bool ListJobProcs(int job, std::vector<std::string>* procs);
+  // remove a job path in zk
+  bool Remove(int job);
+  // remove all job paths in zk
+  bool RemoveAllJobs();
+  // remove all singa related paths in zk
+  bool CleanUp();
+
+ private:
+  const int kJobsNotRemoved = 10;
+
+  bool CleanPath(const std::string& path, bool remove);
+  std::string ExtractClusterConf(const char* job_file);
+
+  std::string host_ = "";
+#ifdef USE_ZOOKEEPER
+  int timeout_ = 30000;
+  ZKService zk_;
+#endif
+};
+
+}  // namespace singa
+
+#endif  // SINGA_UTILS_JOB_MANAGER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/include/singa/utils/zk_service.h
----------------------------------------------------------------------
diff --git a/include/singa/utils/zk_service.h b/include/singa/utils/zk_service.h
new file mode 100644
index 0000000..789215b
--- /dev/null
+++ b/include/singa/utils/zk_service.h
@@ -0,0 +1,116 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+* 
+*   http://www.apache.org/licenses/LICENSE-2.0
+* 
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#ifndef SINGA_UTILS_ZK_SERVICE_H_
+#define SINGA_UTILS_ZK_SERVICE_H_
+
+#include <zookeeper/zookeeper.h>
+#include <string>
+#include <vector>
+
+#include "singa/utils/cluster_rt.h"
+
+namespace singa {
+
+const int kZKBufSize = 100;
+// following paths are global
+const std::string kZKPathSinga = "/singa";
+const std::string kZKPathSys =   "/singa/sys";
+const std::string kZKPathJLock = "/singa/sys/job-lock";
+const std::string kZKPathHostIdx = "/singa/sys/host-idx";
+const std::string kZKPathApp =   "/singa/app";
+const std::string kZKPathJob =   "/singa/app/job-";
+// following paths are local under /singa/app/job-X
+const std::string kZKPathJobGroup = "/group";
+const std::string kZKPathJobProc =  "/proc";
+const std::string kZKPathJobPLock = "/proc-lock";
+
+inline std::string GetZKJobWorkspace(int job_id) {
+  char buf[kZKBufSize];
+  snprintf(buf, kZKBufSize, "%010d", job_id);
+  return kZKPathJob + buf;
+}
+
+/*
+ * A wrapper for zookeeper service which handles error code and reconnections
+ */
+class ZKService {
+ public:
+  static void ChildChanges(zhandle_t* zh, int type, int state,
+                           const char *path, void* watcherCtx);
+
+  ~ZKService();
+  bool Init(const std::string& host, int timeout);
+  bool CreateNode(const char* path, const char* val, int flag, char* output);
+  bool DeleteNode(const char* path);
+  bool Exist(const char* path);
+  bool UpdateNode(const char* path, const char* val);
+  bool GetNode(const char* path, char* output);
+  bool GetChild(const char* path, std::vector<std::string>* vt);
+  bool WGetChild(const char* path, std::vector<std::string>* vt,
+                   RTCallback *cb);
+
+ private:
+  const int kNumRetry = 5;
+  const int kSleepSec = 1;
+
+  static void WatcherGlobal(zhandle_t* zh, int type, int state,
+                            const char *path, void* watcherCtx);
+
+  zhandle_t* zkhandle_ = nullptr;
+};
+
+/*
+ * A ClusterRuntime implementation using zookeeper
+ */
+class ZKClusterRT : public ClusterRuntime {
+ public:
+  ZKClusterRT(const std::string& host, int job_id);
+  ~ZKClusterRT();
+
+  bool Init() override;
+  int RegistProc(const std::string& host_addr, int pid) override;
+  std::string GetProcHost(int proc_id) override;
+  bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) override;
+  bool JoinSGroup(int gid, int wid, int s_group) override;
+  bool LeaveSGroup(int gid, int wid, int s_group) override;
+
+ private:
+  inline std::string groupPath(int gid) {
+    return group_path_ + "/sg" + std::to_string(gid);
+  }
+  inline std::string workerPath(int gid, int wid) {
+    return "/g" + std::to_string(gid) + "_w" + std::to_string(wid);
+  }
+
+  int timeout_ = 30000;
+  std::string host_ = "";
+  ZKService zk_;
+  std::string workspace_ = "";
+  std::string group_path_ = "";
+  std::string proc_path_ = "";
+  std::string proc_lock_path_ = "";
+  std::vector<RTCallback*> cb_vec_;
+};
+
+}  // namespace singa
+
+#endif  // SINGA_UTILS_ZK_SERVICE_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index 5a8b87a..53c0444 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -81,7 +81,8 @@ void Cluster::Init(int job, const SingaProto& singaConf,
           (i * grp_size + j) / cluster_.nservers_per_procs() + offset;
     }
   }
-  cluster_rt_ = new ClusterRuntime(singa_.zookeeper_host(), job);
+  // cluster_rt_ = new ZKClusterRT(singa_.zookeeper_host(), job);
+  cluster_rt_ = new SPClusterRT();
   cluster_rt_->Init();
   hostip_ = GetHostIP();
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index cdf8aab..33e04dc 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -35,505 +35,64 @@ using std::vector;
 
 namespace singa {
 
-void ZKService::ChildChanges(zhandle_t *zh, int type, int state,
-                               const char *path, void *watcherCtx) {
-  // check if already callback
-  RTCallback *cb = static_cast<RTCallback*>(watcherCtx);
-  if (cb->fn == nullptr) return;
-  if (type == ZOO_CHILD_EVENT) {
-    struct String_vector child;
-    // check the child list and put another watcher
-    int ret = zoo_wget_children(zh, path, ChildChanges, watcherCtx, &child);
-    if (ret == ZOK) {
-      if (child.count == 0) {
-        LOG(INFO) << "child.count = 0 in path: " << path;
-        // all workers leave, we do callback now
-        (*cb->fn)(cb->ctx);
-        cb->fn = nullptr;
-      }
-    } else {
-      LOG(FATAL) << "Unhandled ZK error code: " << ret
-                 << " (zoo_wget_children " << path << ")";
-    }
-  } else {
-    LOG(FATAL) << "Unhandled callback type code: "<< type;
-  }
-}
-
-ZKService::~ZKService() {
-  // close zookeeper handler
-  zookeeper_close(zkhandle_);
-}
-
-char zk_cxt[] = "ClusterRuntime";
-
-bool ZKService::Init(const string& host, int timeout) {
-  zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
-  zkhandle_ = zookeeper_init(host.c_str(), WatcherGlobal, timeout, 0,
-                             static_cast<void *>(zk_cxt), 0);
-  if (zkhandle_ == NULL) {
-    LOG(ERROR) << "Error when connecting to zookeeper servers...";
-    LOG(ERROR) << "Please ensure zookeeper service is up in host(s):";
-    LOG(ERROR) << host.c_str();
-    return false;
-  }
-
-  return true;
-}
-
-bool ZKService::CreateNode(const char* path, const char* val, int flag,
-                               char* output) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  char buf[kZKBufSize];
-  int ret = 0;
-  // send the zk request
-  for (int i = 0; i < kNumRetry; ++i) {
-    ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val),
-                     &ZOO_OPEN_ACL_UNSAFE, flag, buf, kZKBufSize);
-    if (ret == ZNONODE) {
-      LOG(WARNING) << "zookeeper parent node of " << path
-                  << " not exist, retry later";
-    } else if (ret == ZCONNECTIONLOSS) {
-      LOG(WARNING) << "zookeeper disconnected, retry later";
-    } else {
-      break;
-    }
-    sleep(kSleepSec);
-  }
-  // copy the node name to output
-  if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) {
-    snprintf(output, kZKBufSize, "%s", buf);
-    // use snprintf instead of strcpy
-    // strcpy(output, buf);
-  }
-  if (ret == ZOK) {
-    LOG(INFO) << "created zookeeper node " << buf
-              << " (" << (val == nullptr ? "NULL" : val) << ")";
-    return true;
-  } else if (ret == ZNODEEXISTS) {
-    LOG(WARNING) << "zookeeper node " << path << " already exists";
-    return true;
-  } else if (ret == ZCONNECTIONLOSS) {
-    LOG(ERROR) << "Cannot connect to zookeeper, "
-               << "please ensure it is running properly...\n"
-               << "If want to use zookeeper in our thirdparty folder, "
-               << "you can start it by:\n"
-               << "$ ./bin/zk-service.sh start";
-    return false;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_create " << path << ")";
-  return false;
-}
-
-bool ZKService::DeleteNode(const char* path) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  int ret = zoo_delete(zkhandle_, path, -1);
-  if (ret == ZOK) {
-    LOG(INFO) << "deleted zookeeper node " << path;
-    return true;
-  } else if (ret == ZNONODE) {
-    LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
-    return true;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_delete " << path << ")";
-  return false;
-}
-
-bool ZKService::Exist(const char* path) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  struct Stat stat;
-  int ret = zoo_exists(zkhandle_, path, 0, &stat);
-  if (ret == ZOK) return true;
-  else if (ret == ZNONODE) return false;
-  LOG(WARNING) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
-  return false;
-}
-
-bool ZKService::UpdateNode(const char* path, const char* val) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  // set version = -1, do not check content version
-  int ret = zoo_set(zkhandle_, path, val, strlen(val), -1);
-  if (ret == ZOK) {
-    return true;
-  } else if (ret == ZNONODE) {
-    LOG(ERROR) << "zk node " << path << " does not exist";
-    return false;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_get " << path << ")";
-  return false;
-}
-
-bool ZKService::GetNode(const char* path, char* output) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  struct Stat stat;
-  int val_len = kZKBufSize;
-  int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat);
-  if (ret == ZOK) {
-    output[val_len] = '\0';
-    return true;
-  } else if (ret == ZNONODE) {
-    LOG(ERROR) << "zk node " << path << " does not exist";
-    return false;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_get " << path << ")";
-  return false;
-}
-
-bool ZKService::GetChild(const char* path, vector<string>* vt) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  struct String_vector child;
-  int ret = zoo_get_children(zkhandle_, path, 0, &child);
-  if (ret == ZOK) {
-    vt->clear();
-    for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
-    return true;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_get_children " << path << ")";
-  return false;
-}
-
-bool ZKService::WGetChild(const char* path, vector<string>* vt,
-                            RTCallback *cb) {
-  CHECK(zkhandle_) << "zk handler not initialized";
-  struct String_vector child;
-  int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child);
-  if (ret == ZOK) {
-    vt->clear();
-    for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
-    return true;
-  }
-  LOG(FATAL) << "Unhandled ZK error code: " << ret
-             << " (zoo_get_children " << path << ")";
-  return false;
-}
-
-
-void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state,
-                                const char *path, void *watcherCtx) {
-  if (type == ZOO_SESSION_EVENT) {
-    if (state == ZOO_CONNECTED_STATE)
-      LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!";
-    else if (state == ZOO_EXPIRED_SESSION_STATE)
-      LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!";
-  }
-}
-
-ClusterRuntime::ClusterRuntime(const string& host, int job_id)
-    : ClusterRuntime(host, job_id, 30000) {}
-
-ClusterRuntime::ClusterRuntime(const string& host, int job_id, int timeout) {
-  host_ = host;
-  timeout_ = timeout;
-  workspace_ = GetZKJobWorkspace(job_id);
-  group_path_ = workspace_ + kZKPathJobGroup;
-  proc_path_ = workspace_ + kZKPathJobProc;
-  proc_lock_path_ = workspace_ + kZKPathJobPLock;
-}
-
-ClusterRuntime::~ClusterRuntime() {
+SPClusterRT::~SPClusterRT() {
   // release callback vector
-  for (RTCallback* p : cb_vec_) {
+  for (auto list : grp_callbacks_)
+    for (RTCallback* p : list.second) {
     delete p;
   }
 }
 
-bool ClusterRuntime::Init() {
-  if (!zk_.Init(host_, timeout_)) return false;
-  if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(workspace_.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(group_path_.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(proc_path_.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(proc_lock_path_.c_str(), nullptr, 0, nullptr))
-    return false;
+bool SPClusterRT::Init() {
   return true;
 }
 
-int ClusterRuntime::RegistProc(const string& host_addr, int pid) {
-  char buf[kZKBufSize];
-  string lock = proc_lock_path_ + "/lock-";
-  if (!zk_.CreateNode(lock.c_str(), nullptr,
-                        ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
-    return -1;
-  }
-  // get all children in lock folder
-  vector<string> vt;
-  if (!zk_.GetChild(proc_lock_path_.c_str(), &vt)) {
-    return -1;
-  }
-  // find own position among all locks
-  int id = -1;
-  std::sort(vt.begin(), vt.end());
-  for (int i = 0; i < static_cast<int>(vt.size()); ++i) {
-    if (proc_lock_path_+"/"+vt[i] == buf) {
-      id = i;
-      break;
-    }
-  }
-  if (id == -1) {
-    LOG(ERROR) << "cannot find own node " << buf;
-    return -1;
-  }
-  // create a new node in proc path
-  string path = proc_path_ + "/proc-" + to_string(id);
-  string content = host_addr + "|" + to_string(pid);
-  if (!zk_.CreateNode(path.c_str(), content.c_str(), ZOO_EPHEMERAL,
-                      nullptr)) {
-    return -1;
-  }
-  return id;
+int SPClusterRT::RegistProc(const string& host_addr, int pid) {
+  int ret;
+  lock_.lock();
+  proc_list_.push_back(host_addr + std::to_string(pid));
+  ret = proc_list_.size()-1;
+  lock_.unlock();
+  return ret;
+}
+
+string SPClusterRT::GetProcHost(int proc_id) {
+  if (proc_list_.size() < (unsigned)proc_id + 1) return "";
+  return proc_list_[proc_id];
 }
 
-bool ClusterRuntime::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
-  CHECK_NOTNULL(fn);
-  string path = groupPath(gid);
-  // create zk node
-  if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false;
-  vector<string> child;
+bool SPClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) {
   // store the callback function and context for later usage
   RTCallback *cb = new RTCallback;
   cb->fn = fn;
   cb->ctx = ctx;
-  cb_vec_.push_back(cb);
-  // start to watch on the zk node, does not care about the first return value
-  return zk_.WGetChild(path.c_str(), &child, cb);
-}
-
-std::string ClusterRuntime::GetProcHost(int proc_id) {
-  char val[kZKBufSize];
-  // construct file name
-  string path = proc_path_ + "/proc-" + to_string(proc_id);
-  if (!zk_.GetNode(path.c_str(), val)) return "";
-  int len = strlen(val) - 1;
-  while (len && val[len] != '|') --len;
-  CHECK(len);
-  val[len] = '\0';
-  return string(val);
-}
-
-bool ClusterRuntime::JoinSGroup(int gid, int wid, int s_group) {
-  string path = groupPath(s_group) + workerPath(gid, wid);
-  // try to create an ephemeral node under server group path
-  return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr);
-}
-
-bool ClusterRuntime::LeaveSGroup(int gid, int wid, int s_group) {
-  string path = groupPath(s_group) + workerPath(gid, wid);
-  return zk_.DeleteNode(path.c_str());
-}
-
-JobManager::JobManager(const string& host) : JobManager(host, 30000) {}
-
-JobManager::JobManager(const string& host, int timeout) {
-  host_ = host;
-  timeout_ = timeout;
-}
-
-bool JobManager::Init() {
-  if (!zk_.Init(host_, timeout_)) return false;
-  if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(kZKPathSys.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(kZKPathJLock.c_str(), nullptr, 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(kZKPathHostIdx.c_str(), "0", 0, nullptr))
-    return false;
-  if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
-    return false;
+  lock_.lock();
+  if (grp_callbacks_.count(gid) == 0)
+    grp_callbacks_[gid] = vector<RTCallback*>{};
+  grp_callbacks_[gid].push_back(cb);
+  lock_.unlock();
   return true;
 }
 
-bool JobManager::GenerateJobID(int* id) {
-  char buf[kZKBufSize];
-  string lock = kZKPathJLock + "/lock-";
-  if (!zk_.CreateNode(lock.c_str(), nullptr,
-                        ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
-    return false;
-  }
-  *id = atoi(buf + strlen(buf) - 10);
+bool SPClusterRT::JoinSGroup(int gid, int wid, int s_group) {
+  lock_.lock();
+  if (grp_count_.count(gid) == 0)
+    grp_count_[gid] = 0;
+  grp_count_[gid]++;
+  lock_.unlock();
   return true;
 }
 
-bool JobManager::GenerateHostList(const char* host_file, const char* job_file,
-                                  vector<string>* list) {
-  int nprocs = 1;
-  // compute required #process from job conf
-  if (job_file != nullptr) {
-    ClusterProto cluster;
-    google::protobuf::TextFormat::ParseFromString(ExtractClusterConf(job_file),
-                                                  &cluster);
-    int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group()
-                        / cluster.nworkers_per_procs();
-    int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group()
-                        / cluster.nservers_per_procs();
-    if (cluster.server_worker_separate())
-      nprocs = nworker_procs + nserver_procs;
-    else
-      nprocs = std::max(nworker_procs, nserver_procs);
-  }
-  // get available host list from global conf
-  std::ifstream hostfile(host_file);
-  if (!hostfile.is_open()) {
-    LOG(FATAL) << "Cannot open file: " << host_file;
-  }
-  vector<string> hosts;
-  string host;
-  while (!hostfile.eof()) {
-    getline(hostfile, host);
-    if (!host.length() || host[0] == '#') continue;
-    hosts.push_back(host);
-  }
-  if (!hosts.size()) {
-    LOG(FATAL) << "Empty host file";
-  }
-  // read next host index
-  char val[kZKBufSize];
-  if (!zk_.GetNode(kZKPathHostIdx.c_str(), val)) return false;
-  int next = atoi(val);
-  // generate host list
-  list->clear();
-  for (int i = 0; i < nprocs; ++i) {
-    list->push_back(hosts[(next + i) % hosts.size()]);
-  }
-  // write next host index
-  next = (next + nprocs) % hosts.size();
-  snprintf(val, kZKBufSize, "%d", next);
-  if (!zk_.UpdateNode(kZKPathHostIdx.c_str(), val)) return false;
-  return true;
-}
-
-bool JobManager::ListJobProcs(int job, vector<string>* procs) {
-  procs->clear();
-  string job_path = GetZKJobWorkspace(job);
-  // check job path
-  if (!zk_.Exist(job_path.c_str())) {
-    LOG(ERROR) << "job " << job << " not exists";
-    return true;
-  }
-  string proc_path = job_path + kZKPathJobProc;
-  vector<string> vt;
-  // check job proc path
-  if (!zk_.GetChild(proc_path.c_str(), &vt)) {
-    return false;
-  }
-  char buf[singa::kZKBufSize];
-  for (string pname : vt) {
-    pname = proc_path + "/" + pname;
-    if (!zk_.GetNode(pname.c_str(), buf)) continue;
-    std::string proc = "";
-    for (int i = 0; buf[i] != '\0'; ++i) {
-      if (buf[i] == ':') {
-        buf[i] = '\0';
-        proc += buf;
-      } else if (buf[i] == '|') {
-        proc += buf + i;
+bool SPClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
+  lock_.lock();
+  if (--grp_count_[gid] == 0) {
+      for (RTCallback* cb : grp_callbacks_[gid]) {
+        (*cb->fn)(cb->ctx);
+        cb->fn = nullptr;
       }
-    }
-    procs->push_back(proc);
   }
-  if (!procs->size()) LOG(ERROR) << "job " << job << " not exists";
+  lock_.unlock();
   return true;
 }
 
-bool JobManager::ListJobs(vector<JobInfo>* jobs) {
-  // get all children in app path
-  jobs->clear();
-  vector<string> vt;
-  if (!zk_.GetChild(kZKPathApp.c_str(), &vt)) {
-    return false;
-  }
-  std::sort(vt.begin(), vt.end());
-  int size = static_cast<int>(vt.size());
-  vector<string> procs;
-  for (int i = 0; i < size; ++i) {
-    string path = kZKPathApp + "/" + vt[i] + kZKPathJobProc;
-    if (!zk_.GetChild(path.c_str(), &procs)) continue;
-    JobInfo job;
-    string jid = vt[i].substr(vt[i].length()-10);
-    job.id = atoi(jid.c_str());
-    job.procs = procs.size();
-    jobs->push_back(job);
-    // may need to delete it
-    if (!job.procs && (i + kJobsNotRemoved < size))
-        CleanPath(kZKPathApp + "/" + vt[i], true);
-  }
-  return true;
-}
-
-bool JobManager::Remove(int job) {
-  string path = GetZKJobWorkspace(job) + kZKPathJobProc;
-  if (zk_.Exist(path.c_str())) {
-    return CleanPath(path.c_str(), false);
-  }
-  return true;
-}
-
-bool JobManager::RemoveAllJobs() {
-  if (zk_.Exist(kZKPathApp.c_str())) {
-    return CleanPath(kZKPathApp.c_str(), false);
-  }
-  return true;
-}
-
-bool JobManager::CleanUp() {
-  if (zk_.Exist(kZKPathSinga.c_str())) {
-    return CleanPath(kZKPathSinga.c_str(), true);
-  }
-  return true;
-}
-
-bool JobManager::CleanPath(const string& path, bool remove) {
-  vector<string> child;
-  if (!zk_.GetChild(path.c_str(), &child)) return false;
-  for (string c : child) {
-    if (!CleanPath(path + "/" + c, true)) return false;
-  }
-  if (remove) return zk_.DeleteNode(path.c_str());
-  return true;
-}
-
-// extract cluster configuration part from the job config file
-// TODO(wangsh) improve this function to make it robust
-string JobManager::ExtractClusterConf(const char* job_file) {
-  std::ifstream fin(job_file);
-  CHECK(fin.is_open()) << "cannot open job conf file " << job_file;
-  string line;
-  string cluster;
-  bool in_cluster = false;
-  while (!fin.eof()) {
-    std::getline(fin, line);
-    if (in_cluster == false) {
-      size_t pos = line.find("cluster");
-      if (pos == std::string::npos) continue;
-      in_cluster = true;
-      line = line.substr(pos);
-      cluster = "";
-    }
-    if (in_cluster == true) {
-      cluster += line + "\n";
-      if (line.find("}") != std::string::npos)
-        in_cluster = false;
-    }
-  }
-  LOG(INFO) << "cluster configure: " << cluster;
-  size_t s_pos = cluster.find("{");
-  size_t e_pos = cluster.find("}");
-  if (s_pos == std::string::npos || e_pos == std::string::npos) {
-    LOG(FATAL) << "cannot extract valid cluster configuration in file: "
-               << job_file;
-  }
-  return cluster.substr(s_pos + 1, e_pos - s_pos-1);
-}
-
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/src/utils/job_manager.cc
----------------------------------------------------------------------
diff --git a/src/utils/job_manager.cc b/src/utils/job_manager.cc
new file mode 100644
index 0000000..2ea5b1b
--- /dev/null
+++ b/src/utils/job_manager.cc
@@ -0,0 +1,271 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+* 
+*   http://www.apache.org/licenses/LICENSE-2.0
+* 
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#include "singa/utils/job_manager.h"
+
+#include <glog/logging.h>
+#include <google/protobuf/text_format.h>
+#include <stdlib.h>
+#include <algorithm>
+#include <fstream>
+#include <iostream>
+#include "singa/proto/job.pb.h"
+
+using std::string;
+using std::vector;
+
+namespace singa {
+
+JobManager::JobManager(const string& host) {
+  host_ = host;
+}
+
+bool JobManager::Init() {
+#ifdef USE_ZOOKEEPER
+  if (!zk_.Init(host_, timeout_)) return false;
+  if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(kZKPathSys.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(kZKPathJLock.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(kZKPathHostIdx.c_str(), "0", 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
+    return false;
+#endif
+  return true;
+}
+
+bool JobManager::GenerateJobID(int* id) {
+#ifdef USE_ZOOKEEPER
+  char buf[kZKBufSize];
+  string lock = kZKPathJLock + "/lock-";
+  if (!zk_.CreateNode(lock.c_str(), nullptr,
+                        ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
+    return false;
+  }
+  *id = atoi(buf + strlen(buf) - 10);
+#else
+  *id = 0;
+#endif
+  return true;
+}
+
+bool JobManager::GenerateHostList(const char* host_file, const char* job_file,
+                                  vector<string>* list) {
+  int nprocs = 1;
+  list->clear();
+  // compute required #process from job conf
+  if (job_file != nullptr) {
+    ClusterProto cluster;
+    google::protobuf::TextFormat::ParseFromString(ExtractClusterConf(job_file),
+                                                  &cluster);
+    int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group()
+                        / cluster.nworkers_per_procs();
+    int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group()
+                        / cluster.nservers_per_procs();
+    if (cluster.server_worker_separate())
+      nprocs = nworker_procs + nserver_procs;
+    else
+      nprocs = std::max(nworker_procs, nserver_procs);
+  }
+#ifdef USE_ZOOKEEPER
+  // get available host list from global conf
+  std::ifstream hostfile(host_file);
+  if (!hostfile.is_open()) {
+    LOG(FATAL) << "Cannot open file: " << host_file;
+  }
+  vector<string> hosts;
+  string host;
+  while (!hostfile.eof()) {
+    getline(hostfile, host);
+    if (!host.length() || host[0] == '#') continue;
+    hosts.push_back(host);
+  }
+  if (!hosts.size()) {
+    LOG(FATAL) << "Empty host file";
+  }
+  // read next host index
+  char val[kZKBufSize];
+  if (!zk_.GetNode(kZKPathHostIdx.c_str(), val)) return false;
+  int next = atoi(val);
+  // generate host list
+  for (int i = 0; i < nprocs; ++i) {
+    list->push_back(hosts[(next + i) % hosts.size()]);
+  }
+  // write next host index
+  next = (next + nprocs) % hosts.size();
+  snprintf(val, kZKBufSize, "%d", next);
+  if (!zk_.UpdateNode(kZKPathHostIdx.c_str(), val)) return false;
+#else
+  CHECK_EQ(nprocs, 1) << "To run multi-process job, please enable zookeeper";
+  list->push_back("localhost");
+#endif
+  return true;
+}
+
+bool JobManager::ListJobProcs(int job, vector<string>* procs) {
+  procs->clear();
+#ifdef USE_ZOOKEEPER
+  string job_path = GetZKJobWorkspace(job);
+  // check job path
+  if (!zk_.Exist(job_path.c_str())) {
+    LOG(ERROR) << "job " << job << " not exists";
+    return true;
+  }
+  string proc_path = job_path + kZKPathJobProc;
+  vector<string> vt;
+  // check job proc path
+  if (!zk_.GetChild(proc_path.c_str(), &vt)) {
+    return false;
+  }
+  char buf[singa::kZKBufSize];
+  for (string pname : vt) {
+    pname = proc_path + "/" + pname;
+    if (!zk_.GetNode(pname.c_str(), buf)) continue;
+    std::string proc = "";
+    for (int i = 0; buf[i] != '\0'; ++i) {
+      if (buf[i] == ':') {
+        buf[i] = '\0';
+        proc += buf;
+      } else if (buf[i] == '|') {
+        proc += buf + i;
+      }
+    }
+    procs->push_back(proc);
+  }
+  if (!procs->size()) LOG(ERROR) << "job " << job << " not exists";
+#endif
+  return true;
+}
+
+bool JobManager::ListJobs(vector<JobInfo>* jobs) {
+  jobs->clear();
+#ifdef USE_ZOOKEEPER
+  vector<string> vt;
+  // get all children in app path
+  if (!zk_.GetChild(kZKPathApp.c_str(), &vt)) {
+    return false;
+  }
+  std::sort(vt.begin(), vt.end());
+  int size = static_cast<int>(vt.size());
+  vector<string> procs;
+  for (int i = 0; i < size; ++i) {
+    string path = kZKPathApp + "/" + vt[i] + kZKPathJobProc;
+    if (!zk_.GetChild(path.c_str(), &procs)) continue;
+    JobInfo job;
+    string jid = vt[i].substr(vt[i].length()-10);
+    job.id = atoi(jid.c_str());
+    job.procs = procs.size();
+    jobs->push_back(job);
+    // may need to delete it
+    if (!job.procs && (i + kJobsNotRemoved < size))
+        CleanPath(kZKPathApp + "/" + vt[i], true);
+  }
+#else
+  LOG(ERROR) << "Not supported without zookeeper";
+#endif
+  return true;
+}
+
+bool JobManager::Remove(int job) {
+#ifdef USE_ZOOKEEPER
+  string path = GetZKJobWorkspace(job) + kZKPathJobProc;
+  if (zk_.Exist(path.c_str())) {
+    return CleanPath(path.c_str(), false);
+  }
+#else
+  LOG(ERROR) << "Not supported without zookeeper";
+#endif
+  return true;
+}
+
+bool JobManager::RemoveAllJobs() {
+#ifdef USE_ZOOKEEPER
+  if (zk_.Exist(kZKPathApp.c_str())) {
+    return CleanPath(kZKPathApp.c_str(), false);
+  }
+#else
+  LOG(ERROR) << "Not supported without zookeeper";
+#endif
+  return true;
+}
+
+bool JobManager::CleanUp() {
+#ifdef USE_ZOOKEEPER
+  if (zk_.Exist(kZKPathSinga.c_str())) {
+    return CleanPath(kZKPathSinga.c_str(), true);
+  }
+#else
+  LOG(ERROR) << "Not supported without zookeeper";
+#endif
+  return true;
+}
+
+bool JobManager::CleanPath(const string& path, bool remove) {
+#ifdef USE_ZOOKEEPER
+  vector<string> child;
+  if (!zk_.GetChild(path.c_str(), &child)) return false;
+  for (string c : child) {
+    if (!CleanPath(path + "/" + c, true)) return false;
+  }
+  if (remove) return zk_.DeleteNode(path.c_str());
+#else
+  LOG(ERROR) << "Not supported without zookeeper";
+#endif
+  return true;
+}
+
+// extract cluster configuration part from the job config file
+// TODO(wangsh) improve this function to make it robust
+string JobManager::ExtractClusterConf(const char* job_file) {
+  std::ifstream fin(job_file);
+  CHECK(fin.is_open()) << "cannot open job conf file " << job_file;
+  string line;
+  string cluster;
+  bool in_cluster = false;
+  while (!fin.eof()) {
+    std::getline(fin, line);
+    if (in_cluster == false) {
+      size_t pos = line.find("cluster");
+      if (pos == std::string::npos) continue;
+      in_cluster = true;
+      line = line.substr(pos);
+      cluster = "";
+    }
+    if (in_cluster == true) {
+      cluster += line + "\n";
+      if (line.find("}") != std::string::npos)
+        in_cluster = false;
+    }
+  }
+  LOG(INFO) << "cluster configure: " << cluster;
+  size_t s_pos = cluster.find("{");
+  size_t e_pos = cluster.find("}");
+  if (s_pos == std::string::npos || e_pos == std::string::npos) {
+    LOG(FATAL) << "cannot extract valid cluster configuration in file: "
+               << job_file;
+  }
+  return cluster.substr(s_pos + 1, e_pos - s_pos-1);
+}
+
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/src/utils/tool.cc
----------------------------------------------------------------------
diff --git a/src/utils/tool.cc b/src/utils/tool.cc
index 4b50214..3b1df72 100644
--- a/src/utils/tool.cc
+++ b/src/utils/tool.cc
@@ -24,8 +24,8 @@
 #include <string>
 #include <vector>
 #include "singa/proto/singa.pb.h"
-#include "singa/utils/cluster_rt.h"
 #include "singa/utils/common.h"
+#include "singa/utils/job_manager.h"
 
 std::string conf_dir;
 singa::SingaProto global;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/src/utils/zk_service.cc
----------------------------------------------------------------------
diff --git a/src/utils/zk_service.cc b/src/utils/zk_service.cc
new file mode 100644
index 0000000..352f6f7
--- /dev/null
+++ b/src/utils/zk_service.cc
@@ -0,0 +1,326 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+* 
+*   http://www.apache.org/licenses/LICENSE-2.0
+* 
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#include "singa/utils/zk_service.h"
+
+#include <glog/logging.h>
+#include <algorithm>
+
+using std::string;
+using std::to_string;
+using std::vector;
+
+namespace singa {
+
+void ZKService::ChildChanges(zhandle_t *zh, int type, int state,
+                               const char *path, void *watcherCtx) {
+  // check if already callback
+  RTCallback *cb = static_cast<RTCallback*>(watcherCtx);
+  if (cb->fn == nullptr) return;
+  if (type == ZOO_CHILD_EVENT) {
+    struct String_vector child;
+    // check the child list and put another watcher
+    int ret = zoo_wget_children(zh, path, ChildChanges, watcherCtx, &child);
+    if (ret == ZOK) {
+      if (child.count == 0) {
+        LOG(INFO) << "child.count = 0 in path: " << path;
+        // all workers leave, we do callback now
+        (*cb->fn)(cb->ctx);
+        cb->fn = nullptr;
+      }
+    } else {
+      LOG(FATAL) << "Unhandled ZK error code: " << ret
+                 << " (zoo_wget_children " << path << ")";
+    }
+  } else {
+    LOG(FATAL) << "Unhandled callback type code: "<< type;
+  }
+}
+
+ZKService::~ZKService() {
+  // close zookeeper handler
+  zookeeper_close(zkhandle_);
+}
+
+char zk_cxt[] = "ZKClusterRT";
+
+bool ZKService::Init(const string& host, int timeout) {
+  zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
+  zkhandle_ = zookeeper_init(host.c_str(), WatcherGlobal, timeout, 0,
+                             static_cast<void *>(zk_cxt), 0);
+  if (zkhandle_ == NULL) {
+    LOG(ERROR) << "Error when connecting to zookeeper servers...";
+    LOG(ERROR) << "Please ensure zookeeper service is up in host(s):";
+    LOG(ERROR) << host.c_str();
+    return false;
+  }
+
+  return true;
+}
+
+bool ZKService::CreateNode(const char* path, const char* val, int flag,
+                               char* output) {
+  CHECK(zkhandle_) << "zk handler not initialized";
+  char buf[kZKBufSize];
+  int ret = 0;
+  // send the zk request
+  for (int i = 0; i < kNumRetry; ++i) {
+    ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val),
+                     &ZOO_OPEN_ACL_UNSAFE, flag, buf, kZKBufSize);
+    if (ret == ZNONODE) {
+      LOG(WARNING) << "zookeeper parent node of " << path
+                  << " not exist, retry later";
+    } else if (ret == ZCONNECTIONLOSS) {
+      LOG(WARNING) << "zookeeper disconnected, retry later";
+    } else {
+      break;
+    }
+    sleep(kSleepSec);
+  }
+  // copy the node name to output
+  if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) {
+    snprintf(output, kZKBufSize, "%s", buf);
+    // use snprintf instead of strcpy
+    // strcpy(output, buf);
+  }
+  if (ret == ZOK) {
+    LOG(INFO) << "created zookeeper node " << buf
+              << " (" << (val == nullptr ? "NULL" : val) << ")";
+    return true;
+  } else if (ret == ZNODEEXISTS) {
+    LOG(WARNING) << "zookeeper node " << path << " already exists";
+    return true;
+  } else if (ret == ZCONNECTIONLOSS) {
+    LOG(ERROR) << "Cannot connect to zookeeper, "
+               << "please ensure it is running properly...\n"
+               << "If want to use zookeeper in our thirdparty folder, "
+               << "you can start it by:\n"
+               << "$ ./bin/zk-service.sh start";
+    return false;
+  }
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+             << " (zoo_create " << path << ")";
+  return false;
+}
+
+bool ZKService::DeleteNode(const char* path) {
+  CHECK(zkhandle_) << "zk handler not initialized";
+  int ret = zoo_delete(zkhandle_, path, -1);
+  if (ret == ZOK) {
+    LOG(INFO) << "deleted zookeeper node " << path;
+    return true;
+  } else if (ret == ZNONODE) {
+    LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
+    return true;
+  }
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+             << " (zoo_delete " << path << ")";
+  return false;
+}
+
+bool ZKService::Exist(const char* path) {
+  CHECK(zkhandle_) << "zk handler not initialized";
+  struct Stat stat;
+  int ret = zoo_exists(zkhandle_, path, 0, &stat);
+  if (ret == ZOK) return true;
+  else if (ret == ZNONODE) return false;
+  LOG(WARNING) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
+  return false;
+}
+
+bool ZKService::UpdateNode(const char* path, const char* val) {
+  CHECK(zkhandle_) << "zk handler not initialized";
+  // set version = -1, do not check content version
+  int ret = zoo_set(zkhandle_, path, val, strlen(val), -1);
+  if (ret == ZOK) {
+    return true;
+  } else if (ret == ZNONODE) {
+    LOG(ERROR) << "zk node " << path << " does not exist";
+    return false;
+  }
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+             << " (zoo_get " << path << ")";
+  return false;
+}
+
+bool ZKService::GetNode(const char* path, char* output) {
+  CHECK(zkhandle_) << "zk handler not initialized";
+  struct Stat stat;
+  int val_len = kZKBufSize;
+  int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat);
+  if (ret == ZOK) {
+    output[val_len] = '\0';
+    return true;
+  } else if (ret == ZNONODE) {
+    LOG(ERROR) << "zk node " << path << " does not exist";
+    return false;
+  }
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+             << " (zoo_get " << path << ")";
+  return false;
+}
+
+bool ZKService::GetChild(const char* path, vector<string>* vt) {
+  CHECK(zkhandle_) << "zk handler not initialized";
+  struct String_vector child;
+  int ret = zoo_get_children(zkhandle_, path, 0, &child);
+  if (ret == ZOK) {
+    vt->clear();
+    for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
+    return true;
+  }
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+             << " (zoo_get_children " << path << ")";
+  return false;
+}
+
+bool ZKService::WGetChild(const char* path, vector<string>* vt,
+                            RTCallback *cb) {
+  CHECK(zkhandle_) << "zk handler not initialized";
+  struct String_vector child;
+  int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child);
+  if (ret == ZOK) {
+    vt->clear();
+    for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
+    return true;
+  }
+  LOG(FATAL) << "Unhandled ZK error code: " << ret
+             << " (zoo_get_children " << path << ")";
+  return false;
+}
+
+
+void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state,
+                                const char *path, void *watcherCtx) {
+  if (type == ZOO_SESSION_EVENT) {
+    if (state == ZOO_CONNECTED_STATE)
+      LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!";
+    else if (state == ZOO_EXPIRED_SESSION_STATE)
+      LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!";
+  }
+}
+
+ZKClusterRT::ZKClusterRT(const string& host, int job_id) {
+  host_ = host;
+  workspace_ = GetZKJobWorkspace(job_id);
+  group_path_ = workspace_ + kZKPathJobGroup;
+  proc_path_ = workspace_ + kZKPathJobProc;
+  proc_lock_path_ = workspace_ + kZKPathJobPLock;
+}
+
+ZKClusterRT::~ZKClusterRT() {
+  // release callback vector
+  for (RTCallback* p : cb_vec_) {
+    delete p;
+  }
+}
+
+bool ZKClusterRT::Init() {
+  if (!zk_.Init(host_, timeout_)) return false;
+  if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(workspace_.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(group_path_.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(proc_path_.c_str(), nullptr, 0, nullptr))
+    return false;
+  if (!zk_.CreateNode(proc_lock_path_.c_str(), nullptr, 0, nullptr))
+    return false;
+  return true;
+}
+
+int ZKClusterRT::RegistProc(const string& host_addr, int pid) {
+  char buf[kZKBufSize];
+  string lock = proc_lock_path_ + "/lock-";
+  if (!zk_.CreateNode(lock.c_str(), nullptr,
+                        ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
+    return -1;
+  }
+  // get all children in lock folder
+  vector<string> vt;
+  if (!zk_.GetChild(proc_lock_path_.c_str(), &vt)) {
+    return -1;
+  }
+  // find own position among all locks
+  int id = -1;
+  std::sort(vt.begin(), vt.end());
+  for (int i = 0; i < static_cast<int>(vt.size()); ++i) {
+    if (proc_lock_path_+"/"+vt[i] == buf) {
+      id = i;
+      break;
+    }
+  }
+  if (id == -1) {
+    LOG(ERROR) << "cannot find own node " << buf;
+    return -1;
+  }
+  // create a new node in proc path
+  string path = proc_path_ + "/proc-" + to_string(id);
+  string content = host_addr + "|" + to_string(pid);
+  if (!zk_.CreateNode(path.c_str(), content.c_str(), ZOO_EPHEMERAL,
+                      nullptr)) {
+    return -1;
+  }
+  return id;
+}
+
+std::string ZKClusterRT::GetProcHost(int proc_id) {
+  char val[kZKBufSize];
+  // construct file name
+  string path = proc_path_ + "/proc-" + to_string(proc_id);
+  if (!zk_.GetNode(path.c_str(), val)) return "";
+  int len = strlen(val) - 1;
+  while (len && val[len] != '|') --len;
+  CHECK(len);
+  val[len] = '\0';
+  return string(val);
+}
+
+bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) {
+  CHECK_NOTNULL(fn);
+  string path = groupPath(gid);
+  // create zk node
+  if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false;
+  vector<string> child;
+  // store the callback function and context for later usage
+  RTCallback *cb = new RTCallback;
+  cb->fn = fn;
+  cb->ctx = ctx;
+  cb_vec_.push_back(cb);
+  // start to watch on the zk node, does not care about the first return value
+  return zk_.WGetChild(path.c_str(), &child, cb);
+}
+
+bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) {
+  string path = groupPath(s_group) + workerPath(gid, wid);
+  // try to create an ephemeral node under server group path
+  return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr);
+}
+
+bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) {
+  string path = groupPath(s_group) + workerPath(gid, wid);
+  return zk_.DeleteNode(path.c_str());
+}
+
+}  // namespace singa


Mime
View raw message