Return-Path: X-Original-To: apmail-singa-commits-archive@minotaur.apache.org Delivered-To: apmail-singa-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F4031193DD for ; Sat, 2 Apr 2016 03:46:45 +0000 (UTC) Received: (qmail 19092 invoked by uid 500); 2 Apr 2016 03:46:46 -0000 Delivered-To: apmail-singa-commits-archive@singa.apache.org Received: (qmail 19069 invoked by uid 500); 2 Apr 2016 03:46:45 -0000 Mailing-List: contact commits-help@singa.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@singa.incubator.apache.org Delivered-To: mailing list commits@singa.incubator.apache.org Received: (qmail 19060 invoked by uid 99); 2 Apr 2016 03:46:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 02 Apr 2016 03:46:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 60A38C0362 for ; Sat, 2 Apr 2016 03:46:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ja-LBBPDPUsC for ; Sat, 2 Apr 2016 03:46:36 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 3C7AC5F20E for ; Sat, 2 Apr 2016 03:46:35 +0000 (UTC) Received: (qmail 19010 invoked by uid 99); 2 Apr 2016 03:46:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 02 Apr 2016 03:46:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 752DCE0211; Sat, 2 Apr 2016 03:46:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangwei@apache.org To: commits@singa.incubator.apache.org Date: Sat, 02 Apr 2016 03:46:34 -0000 Message-Id: <50d5701a48f64e78adbb96c05654053b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-singa git commit: SINGA-155 Remove zookeeper for single-process training Repository: incubator-singa Updated Branches: refs/heads/master b00dc32fb -> 0233049ce SINGA-155 Remove zookeeper for single-process training add single-process cluster-runtime implementation. rearrange functions in cluster_rt.h into: * cluster_rt.h - cluster-runtime interface and SPClusterRT impl * zk_service.h - zookeeper wrapper and ZKClusterRT impl * job_manager.h - job manager Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/adeced6b Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/adeced6b Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/adeced6b Branch: refs/heads/master Commit: adeced6b5ff0637ae0e2bf0817514598ae7b6f1c Parents: b00dc32 Author: WANG Sheng Authored: Thu Mar 31 20:06:12 2016 +0800 Committer: WANG Sheng Committed: Fri Apr 1 12:59:52 2016 +0800 ---------------------------------------------------------------------- Makefile.am | 7 +- include/singa/utils/cluster_rt.h | 136 ++------- include/singa/utils/job_manager.h | 79 +++++ include/singa/utils/zk_service.h | 116 ++++++++ src/utils/cluster.cc | 3 +- src/utils/cluster_rt.cc | 511 +++------------------------------ src/utils/job_manager.cc | 271 +++++++++++++++++ src/utils/tool.cc | 2 +- src/utils/zk_service.cc | 326 +++++++++++++++++++++ 9 files changed, 860 insertions(+), 591 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/Makefile.am ---------------------------------------------------------------------- diff --git a/Makefile.am b/Makefile.am index b2587a4..f185a9d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -100,7 +100,10 @@ SINGA_SRCS := src/driver.cc \ src/utils/param.cc \ src/utils/updater.cc \ src/utils/blob.cc \ - src/utils/image_transform.cc + src/utils/image_transform.cc \ + src/utils/job_manager.cc \ + src/utils/zk_service.cc + SINGA_HDRS := include/singa.h \ include/singa/utils/math_blob.h \ @@ -118,6 +121,8 @@ SINGA_HDRS := include/singa.h \ include/utils/tinydir.h \ include/utils/tokenizer.h \ include/utils/image_transform.h \ + include/utils/job_manager.cc \ + include/utils/zk_service.cc \ include/server.h \ include/worker.h \ include/stub.h \ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/include/singa/utils/cluster_rt.h ---------------------------------------------------------------------- diff --git a/include/singa/utils/cluster_rt.h b/include/singa/utils/cluster_rt.h index 5de6c16..e0125be 100644 --- a/include/singa/utils/cluster_rt.h +++ b/include/singa/utils/cluster_rt.h @@ -22,7 +22,8 @@ #ifndef SINGA_UTILS_CLUSTER_RT_H_ #define SINGA_UTILS_CLUSTER_RT_H_ -#include +#include +#include #include #include @@ -30,65 +31,11 @@ namespace singa { typedef void (*rt_callback)(void *contest); -const int kZKBufSize = 100; -// following paths are global -const std::string kZKPathSinga = "/singa"; -const std::string kZKPathSys = "/singa/sys"; -const std::string kZKPathJLock = "/singa/sys/job-lock"; -const std::string kZKPathHostIdx = "/singa/sys/host-idx"; -const std::string kZKPathApp = "/singa/app"; -const std::string kZKPathJob = "/singa/app/job-"; -// following paths are local under /singa/app/job-X -const std::string kZKPathJobGroup = "/group"; -const std::string kZKPathJobProc = "/proc"; -const std::string kZKPathJobPLock = "/proc-lock"; - -inline std::string GetZKJobWorkspace(int job_id) { - char buf[kZKBufSize]; - snprintf(buf, kZKBufSize, "%010d", job_id); - return kZKPathJob + buf; -} - struct RTCallback { rt_callback fn; void* ctx; }; -struct JobInfo { - int id; - int procs; - std::string name; -}; - -/* - * A wrapper for zookeeper service which handles error code and reconnections - */ -class ZKService { - public: - static void ChildChanges(zhandle_t* zh, int type, int state, - const char *path, void* watcherCtx); - - ~ZKService(); - bool Init(const std::string& host, int timeout); - bool CreateNode(const char* path, const char* val, int flag, char* output); - bool DeleteNode(const char* path); - bool Exist(const char* path); - bool UpdateNode(const char* path, const char* val); - bool GetNode(const char* path, char* output); - bool GetChild(const char* path, std::vector* vt); - bool WGetChild(const char* path, std::vector* vt, - RTCallback *cb); - - private: - const int kNumRetry = 5; - const int kSleepSec = 1; - - static void WatcherGlobal(zhandle_t* zh, int type, int state, - const char *path, void* watcherCtx); - - zhandle_t* zkhandle_ = nullptr; -}; - /** * ClusterRuntime is a runtime service that manages dynamic configuration * and status of the whole cluster. It mainly provides following services: @@ -97,92 +44,57 @@ class ZKService { */ class ClusterRuntime { public: - ClusterRuntime(const std::string& host, int job_id); - ClusterRuntime(const std::string& host, int job_id, int timeout); - ~ClusterRuntime(); - + virtual ~ClusterRuntime() {} /** * Initialize the runtime instance */ - bool Init(); + virtual bool Init() = 0; /** * register the process, and get a unique process id * * \return the process id, -1 if failed */ - int RegistProc(const std::string& host_addr, int pid); + virtual int RegistProc(const std::string& host_addr, int pid) = 0; /** * translate the process id to host address * * \return the host and port, "" if no such proc id */ - std::string GetProcHost(int proc_id); + virtual std::string GetProcHost(int proc_id) = 0; /** * Server: watch all workers in a server group, * will be notified when all workers have left */ - bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx); + virtual bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) = 0; /** * Worker: join a server group (i.e. start to read/update these servers) */ - bool JoinSGroup(int gid, int wid, int s_group); + virtual bool JoinSGroup(int gid, int wid, int s_group) = 0; /** * Worker: leave a server group (i.e. finish its all work) */ - bool LeaveSGroup(int gid, int wid, int s_group); - - private: - inline std::string groupPath(int gid) { - return group_path_ + "/sg" + std::to_string(gid); - } - inline std::string workerPath(int gid, int wid) { - return "/g" + std::to_string(gid) + "_w" + std::to_string(wid); - } - - int timeout_ = 30000; - std::string host_ = ""; - ZKService zk_; - std::string workspace_ = ""; - std::string group_path_ = ""; - std::string proc_path_ = ""; - std::string proc_lock_path_ = ""; - std::vector cb_vec_; + virtual bool LeaveSGroup(int gid, int wid, int s_group) = 0; }; -class JobManager { +/* + * A ClusterRuntime implementation for single-process environment + */ +class SPClusterRT : public ClusterRuntime { public: - // host is comma separated host:port pairs, each corresponding to a zk server. - // e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" - explicit JobManager(const std::string& host); - JobManager(const std::string& host, int timeout); + ~SPClusterRT(); - // NOTICE: Init must be called once, before start to use other functions - bool Init(); - // generate a unique job id - bool GenerateJobID(int* id); - // generate a list of hosts for a job conf - bool GenerateHostList(const char* host_file, const char* job_file, - std::vector* list); - // list all jobs recorded in zk - bool ListJobs(std::vector* jobs); - // list running processes for a job - bool ListJobProcs(int job, std::vector* procs); - // remove a job path in zk - bool Remove(int job); - // remove all job paths in zk - bool RemoveAllJobs(); - // remove all singa related paths in zk - bool CleanUp(); + bool Init() override; + int RegistProc(const std::string& host_addr, int pid) override; + std::string GetProcHost(int proc_id) override; + bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) override; + bool JoinSGroup(int gid, int wid, int s_group) override; + bool LeaveSGroup(int gid, int wid, int s_group) override; private: - const int kJobsNotRemoved = 10; - - bool CleanPath(const std::string& path, bool remove); - std::string ExtractClusterConf(const char* job_file); - - int timeout_ = 30000; - std::string host_ = ""; - ZKService zk_; + std::vector proc_list_; + std::map> grp_callbacks_; + std::map grp_count_; + std::mutex lock_; }; } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/include/singa/utils/job_manager.h ---------------------------------------------------------------------- diff --git a/include/singa/utils/job_manager.h b/include/singa/utils/job_manager.h new file mode 100644 index 0000000..7f1b4f1 --- /dev/null +++ b/include/singa/utils/job_manager.h @@ -0,0 +1,79 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#ifndef SINGA_UTILS_JOB_MANAGER_H_ +#define SINGA_UTILS_JOB_MANAGER_H_ + +#include +#include + +#ifdef USE_ZOOKEEPER +#include "singa/utils/zk_service.h" +#endif + +namespace singa { + +struct JobInfo { + int id; + int procs; + std::string name; +}; + +class JobManager { + public: + // host is comma separated host:port pairs, each corresponding to a zk server. + // e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" + explicit JobManager(const std::string& host); + + // NOTICE: Init must be called once, before start to use other functions + bool Init(); + // generate a unique job id + bool GenerateJobID(int* id); + // generate a list of hosts for a job conf + bool GenerateHostList(const char* host_file, const char* job_file, + std::vector* list); + // list all jobs recorded in zk + bool ListJobs(std::vector* jobs); + // list running processes for a job + bool ListJobProcs(int job, std::vector* procs); + // remove a job path in zk + bool Remove(int job); + // remove all job paths in zk + bool RemoveAllJobs(); + // remove all singa related paths in zk + bool CleanUp(); + + private: + const int kJobsNotRemoved = 10; + + bool CleanPath(const std::string& path, bool remove); + std::string ExtractClusterConf(const char* job_file); + + std::string host_ = ""; +#ifdef USE_ZOOKEEPER + int timeout_ = 30000; + ZKService zk_; +#endif +}; + +} // namespace singa + +#endif // SINGA_UTILS_JOB_MANAGER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/include/singa/utils/zk_service.h ---------------------------------------------------------------------- diff --git a/include/singa/utils/zk_service.h b/include/singa/utils/zk_service.h new file mode 100644 index 0000000..789215b --- /dev/null +++ b/include/singa/utils/zk_service.h @@ -0,0 +1,116 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#ifndef SINGA_UTILS_ZK_SERVICE_H_ +#define SINGA_UTILS_ZK_SERVICE_H_ + +#include +#include +#include + +#include "singa/utils/cluster_rt.h" + +namespace singa { + +const int kZKBufSize = 100; +// following paths are global +const std::string kZKPathSinga = "/singa"; +const std::string kZKPathSys = "/singa/sys"; +const std::string kZKPathJLock = "/singa/sys/job-lock"; +const std::string kZKPathHostIdx = "/singa/sys/host-idx"; +const std::string kZKPathApp = "/singa/app"; +const std::string kZKPathJob = "/singa/app/job-"; +// following paths are local under /singa/app/job-X +const std::string kZKPathJobGroup = "/group"; +const std::string kZKPathJobProc = "/proc"; +const std::string kZKPathJobPLock = "/proc-lock"; + +inline std::string GetZKJobWorkspace(int job_id) { + char buf[kZKBufSize]; + snprintf(buf, kZKBufSize, "%010d", job_id); + return kZKPathJob + buf; +} + +/* + * A wrapper for zookeeper service which handles error code and reconnections + */ +class ZKService { + public: + static void ChildChanges(zhandle_t* zh, int type, int state, + const char *path, void* watcherCtx); + + ~ZKService(); + bool Init(const std::string& host, int timeout); + bool CreateNode(const char* path, const char* val, int flag, char* output); + bool DeleteNode(const char* path); + bool Exist(const char* path); + bool UpdateNode(const char* path, const char* val); + bool GetNode(const char* path, char* output); + bool GetChild(const char* path, std::vector* vt); + bool WGetChild(const char* path, std::vector* vt, + RTCallback *cb); + + private: + const int kNumRetry = 5; + const int kSleepSec = 1; + + static void WatcherGlobal(zhandle_t* zh, int type, int state, + const char *path, void* watcherCtx); + + zhandle_t* zkhandle_ = nullptr; +}; + +/* + * A ClusterRuntime implementation using zookeeper + */ +class ZKClusterRT : public ClusterRuntime { + public: + ZKClusterRT(const std::string& host, int job_id); + ~ZKClusterRT(); + + bool Init() override; + int RegistProc(const std::string& host_addr, int pid) override; + std::string GetProcHost(int proc_id) override; + bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) override; + bool JoinSGroup(int gid, int wid, int s_group) override; + bool LeaveSGroup(int gid, int wid, int s_group) override; + + private: + inline std::string groupPath(int gid) { + return group_path_ + "/sg" + std::to_string(gid); + } + inline std::string workerPath(int gid, int wid) { + return "/g" + std::to_string(gid) + "_w" + std::to_string(wid); + } + + int timeout_ = 30000; + std::string host_ = ""; + ZKService zk_; + std::string workspace_ = ""; + std::string group_path_ = ""; + std::string proc_path_ = ""; + std::string proc_lock_path_ = ""; + std::vector cb_vec_; +}; + +} // namespace singa + +#endif // SINGA_UTILS_ZK_SERVICE_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/src/utils/cluster.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc index 5a8b87a..53c0444 100644 --- a/src/utils/cluster.cc +++ b/src/utils/cluster.cc @@ -81,7 +81,8 @@ void Cluster::Init(int job, const SingaProto& singaConf, (i * grp_size + j) / cluster_.nservers_per_procs() + offset; } } - cluster_rt_ = new ClusterRuntime(singa_.zookeeper_host(), job); + // cluster_rt_ = new ZKClusterRT(singa_.zookeeper_host(), job); + cluster_rt_ = new SPClusterRT(); cluster_rt_->Init(); hostip_ = GetHostIP(); } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/src/utils/cluster_rt.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc index cdf8aab..33e04dc 100644 --- a/src/utils/cluster_rt.cc +++ b/src/utils/cluster_rt.cc @@ -35,505 +35,64 @@ using std::vector; namespace singa { -void ZKService::ChildChanges(zhandle_t *zh, int type, int state, - const char *path, void *watcherCtx) { - // check if already callback - RTCallback *cb = static_cast(watcherCtx); - if (cb->fn == nullptr) return; - if (type == ZOO_CHILD_EVENT) { - struct String_vector child; - // check the child list and put another watcher - int ret = zoo_wget_children(zh, path, ChildChanges, watcherCtx, &child); - if (ret == ZOK) { - if (child.count == 0) { - LOG(INFO) << "child.count = 0 in path: " << path; - // all workers leave, we do callback now - (*cb->fn)(cb->ctx); - cb->fn = nullptr; - } - } else { - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_wget_children " << path << ")"; - } - } else { - LOG(FATAL) << "Unhandled callback type code: "<< type; - } -} - -ZKService::~ZKService() { - // close zookeeper handler - zookeeper_close(zkhandle_); -} - -char zk_cxt[] = "ClusterRuntime"; - -bool ZKService::Init(const string& host, int timeout) { - zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); - zkhandle_ = zookeeper_init(host.c_str(), WatcherGlobal, timeout, 0, - static_cast(zk_cxt), 0); - if (zkhandle_ == NULL) { - LOG(ERROR) << "Error when connecting to zookeeper servers..."; - LOG(ERROR) << "Please ensure zookeeper service is up in host(s):"; - LOG(ERROR) << host.c_str(); - return false; - } - - return true; -} - -bool ZKService::CreateNode(const char* path, const char* val, int flag, - char* output) { - CHECK(zkhandle_) << "zk handler not initialized"; - char buf[kZKBufSize]; - int ret = 0; - // send the zk request - for (int i = 0; i < kNumRetry; ++i) { - ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val), - &ZOO_OPEN_ACL_UNSAFE, flag, buf, kZKBufSize); - if (ret == ZNONODE) { - LOG(WARNING) << "zookeeper parent node of " << path - << " not exist, retry later"; - } else if (ret == ZCONNECTIONLOSS) { - LOG(WARNING) << "zookeeper disconnected, retry later"; - } else { - break; - } - sleep(kSleepSec); - } - // copy the node name to output - if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) { - snprintf(output, kZKBufSize, "%s", buf); - // use snprintf instead of strcpy - // strcpy(output, buf); - } - if (ret == ZOK) { - LOG(INFO) << "created zookeeper node " << buf - << " (" << (val == nullptr ? "NULL" : val) << ")"; - return true; - } else if (ret == ZNODEEXISTS) { - LOG(WARNING) << "zookeeper node " << path << " already exists"; - return true; - } else if (ret == ZCONNECTIONLOSS) { - LOG(ERROR) << "Cannot connect to zookeeper, " - << "please ensure it is running properly...\n" - << "If want to use zookeeper in our thirdparty folder, " - << "you can start it by:\n" - << "$ ./bin/zk-service.sh start"; - return false; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_create " << path << ")"; - return false; -} - -bool ZKService::DeleteNode(const char* path) { - CHECK(zkhandle_) << "zk handler not initialized"; - int ret = zoo_delete(zkhandle_, path, -1); - if (ret == ZOK) { - LOG(INFO) << "deleted zookeeper node " << path; - return true; - } else if (ret == ZNONODE) { - LOG(WARNING) << "try to delete an non-existing zookeeper node " << path; - return true; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_delete " << path << ")"; - return false; -} - -bool ZKService::Exist(const char* path) { - CHECK(zkhandle_) << "zk handler not initialized"; - struct Stat stat; - int ret = zoo_exists(zkhandle_, path, 0, &stat); - if (ret == ZOK) return true; - else if (ret == ZNONODE) return false; - LOG(WARNING) << "Unhandled ZK error code: " << ret << " (zoo_exists)"; - return false; -} - -bool ZKService::UpdateNode(const char* path, const char* val) { - CHECK(zkhandle_) << "zk handler not initialized"; - // set version = -1, do not check content version - int ret = zoo_set(zkhandle_, path, val, strlen(val), -1); - if (ret == ZOK) { - return true; - } else if (ret == ZNONODE) { - LOG(ERROR) << "zk node " << path << " does not exist"; - return false; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_get " << path << ")"; - return false; -} - -bool ZKService::GetNode(const char* path, char* output) { - CHECK(zkhandle_) << "zk handler not initialized"; - struct Stat stat; - int val_len = kZKBufSize; - int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat); - if (ret == ZOK) { - output[val_len] = '\0'; - return true; - } else if (ret == ZNONODE) { - LOG(ERROR) << "zk node " << path << " does not exist"; - return false; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_get " << path << ")"; - return false; -} - -bool ZKService::GetChild(const char* path, vector* vt) { - CHECK(zkhandle_) << "zk handler not initialized"; - struct String_vector child; - int ret = zoo_get_children(zkhandle_, path, 0, &child); - if (ret == ZOK) { - vt->clear(); - for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]); - return true; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_get_children " << path << ")"; - return false; -} - -bool ZKService::WGetChild(const char* path, vector* vt, - RTCallback *cb) { - CHECK(zkhandle_) << "zk handler not initialized"; - struct String_vector child; - int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child); - if (ret == ZOK) { - vt->clear(); - for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]); - return true; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_get_children " << path << ")"; - return false; -} - - -void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state, - const char *path, void *watcherCtx) { - if (type == ZOO_SESSION_EVENT) { - if (state == ZOO_CONNECTED_STATE) - LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!"; - else if (state == ZOO_EXPIRED_SESSION_STATE) - LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!"; - } -} - -ClusterRuntime::ClusterRuntime(const string& host, int job_id) - : ClusterRuntime(host, job_id, 30000) {} - -ClusterRuntime::ClusterRuntime(const string& host, int job_id, int timeout) { - host_ = host; - timeout_ = timeout; - workspace_ = GetZKJobWorkspace(job_id); - group_path_ = workspace_ + kZKPathJobGroup; - proc_path_ = workspace_ + kZKPathJobProc; - proc_lock_path_ = workspace_ + kZKPathJobPLock; -} - -ClusterRuntime::~ClusterRuntime() { +SPClusterRT::~SPClusterRT() { // release callback vector - for (RTCallback* p : cb_vec_) { + for (auto list : grp_callbacks_) + for (RTCallback* p : list.second) { delete p; } } -bool ClusterRuntime::Init() { - if (!zk_.Init(host_, timeout_)) return false; - if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(workspace_.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(group_path_.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(proc_path_.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(proc_lock_path_.c_str(), nullptr, 0, nullptr)) - return false; +bool SPClusterRT::Init() { return true; } -int ClusterRuntime::RegistProc(const string& host_addr, int pid) { - char buf[kZKBufSize]; - string lock = proc_lock_path_ + "/lock-"; - if (!zk_.CreateNode(lock.c_str(), nullptr, - ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) { - return -1; - } - // get all children in lock folder - vector vt; - if (!zk_.GetChild(proc_lock_path_.c_str(), &vt)) { - return -1; - } - // find own position among all locks - int id = -1; - std::sort(vt.begin(), vt.end()); - for (int i = 0; i < static_cast(vt.size()); ++i) { - if (proc_lock_path_+"/"+vt[i] == buf) { - id = i; - break; - } - } - if (id == -1) { - LOG(ERROR) << "cannot find own node " << buf; - return -1; - } - // create a new node in proc path - string path = proc_path_ + "/proc-" + to_string(id); - string content = host_addr + "|" + to_string(pid); - if (!zk_.CreateNode(path.c_str(), content.c_str(), ZOO_EPHEMERAL, - nullptr)) { - return -1; - } - return id; +int SPClusterRT::RegistProc(const string& host_addr, int pid) { + int ret; + lock_.lock(); + proc_list_.push_back(host_addr + std::to_string(pid)); + ret = proc_list_.size()-1; + lock_.unlock(); + return ret; +} + +string SPClusterRT::GetProcHost(int proc_id) { + if (proc_list_.size() < (unsigned)proc_id + 1) return ""; + return proc_list_[proc_id]; } -bool ClusterRuntime::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) { - CHECK_NOTNULL(fn); - string path = groupPath(gid); - // create zk node - if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false; - vector child; +bool SPClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) { // store the callback function and context for later usage RTCallback *cb = new RTCallback; cb->fn = fn; cb->ctx = ctx; - cb_vec_.push_back(cb); - // start to watch on the zk node, does not care about the first return value - return zk_.WGetChild(path.c_str(), &child, cb); -} - -std::string ClusterRuntime::GetProcHost(int proc_id) { - char val[kZKBufSize]; - // construct file name - string path = proc_path_ + "/proc-" + to_string(proc_id); - if (!zk_.GetNode(path.c_str(), val)) return ""; - int len = strlen(val) - 1; - while (len && val[len] != '|') --len; - CHECK(len); - val[len] = '\0'; - return string(val); -} - -bool ClusterRuntime::JoinSGroup(int gid, int wid, int s_group) { - string path = groupPath(s_group) + workerPath(gid, wid); - // try to create an ephemeral node under server group path - return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr); -} - -bool ClusterRuntime::LeaveSGroup(int gid, int wid, int s_group) { - string path = groupPath(s_group) + workerPath(gid, wid); - return zk_.DeleteNode(path.c_str()); -} - -JobManager::JobManager(const string& host) : JobManager(host, 30000) {} - -JobManager::JobManager(const string& host, int timeout) { - host_ = host; - timeout_ = timeout; -} - -bool JobManager::Init() { - if (!zk_.Init(host_, timeout_)) return false; - if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(kZKPathSys.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(kZKPathJLock.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(kZKPathHostIdx.c_str(), "0", 0, nullptr)) - return false; - if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr)) - return false; + lock_.lock(); + if (grp_callbacks_.count(gid) == 0) + grp_callbacks_[gid] = vector{}; + grp_callbacks_[gid].push_back(cb); + lock_.unlock(); return true; } -bool JobManager::GenerateJobID(int* id) { - char buf[kZKBufSize]; - string lock = kZKPathJLock + "/lock-"; - if (!zk_.CreateNode(lock.c_str(), nullptr, - ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) { - return false; - } - *id = atoi(buf + strlen(buf) - 10); +bool SPClusterRT::JoinSGroup(int gid, int wid, int s_group) { + lock_.lock(); + if (grp_count_.count(gid) == 0) + grp_count_[gid] = 0; + grp_count_[gid]++; + lock_.unlock(); return true; } -bool JobManager::GenerateHostList(const char* host_file, const char* job_file, - vector* list) { - int nprocs = 1; - // compute required #process from job conf - if (job_file != nullptr) { - ClusterProto cluster; - google::protobuf::TextFormat::ParseFromString(ExtractClusterConf(job_file), - &cluster); - int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group() - / cluster.nworkers_per_procs(); - int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group() - / cluster.nservers_per_procs(); - if (cluster.server_worker_separate()) - nprocs = nworker_procs + nserver_procs; - else - nprocs = std::max(nworker_procs, nserver_procs); - } - // get available host list from global conf - std::ifstream hostfile(host_file); - if (!hostfile.is_open()) { - LOG(FATAL) << "Cannot open file: " << host_file; - } - vector hosts; - string host; - while (!hostfile.eof()) { - getline(hostfile, host); - if (!host.length() || host[0] == '#') continue; - hosts.push_back(host); - } - if (!hosts.size()) { - LOG(FATAL) << "Empty host file"; - } - // read next host index - char val[kZKBufSize]; - if (!zk_.GetNode(kZKPathHostIdx.c_str(), val)) return false; - int next = atoi(val); - // generate host list - list->clear(); - for (int i = 0; i < nprocs; ++i) { - list->push_back(hosts[(next + i) % hosts.size()]); - } - // write next host index - next = (next + nprocs) % hosts.size(); - snprintf(val, kZKBufSize, "%d", next); - if (!zk_.UpdateNode(kZKPathHostIdx.c_str(), val)) return false; - return true; -} - -bool JobManager::ListJobProcs(int job, vector* procs) { - procs->clear(); - string job_path = GetZKJobWorkspace(job); - // check job path - if (!zk_.Exist(job_path.c_str())) { - LOG(ERROR) << "job " << job << " not exists"; - return true; - } - string proc_path = job_path + kZKPathJobProc; - vector vt; - // check job proc path - if (!zk_.GetChild(proc_path.c_str(), &vt)) { - return false; - } - char buf[singa::kZKBufSize]; - for (string pname : vt) { - pname = proc_path + "/" + pname; - if (!zk_.GetNode(pname.c_str(), buf)) continue; - std::string proc = ""; - for (int i = 0; buf[i] != '\0'; ++i) { - if (buf[i] == ':') { - buf[i] = '\0'; - proc += buf; - } else if (buf[i] == '|') { - proc += buf + i; +bool SPClusterRT::LeaveSGroup(int gid, int wid, int s_group) { + lock_.lock(); + if (--grp_count_[gid] == 0) { + for (RTCallback* cb : grp_callbacks_[gid]) { + (*cb->fn)(cb->ctx); + cb->fn = nullptr; } - } - procs->push_back(proc); } - if (!procs->size()) LOG(ERROR) << "job " << job << " not exists"; + lock_.unlock(); return true; } -bool JobManager::ListJobs(vector* jobs) { - // get all children in app path - jobs->clear(); - vector vt; - if (!zk_.GetChild(kZKPathApp.c_str(), &vt)) { - return false; - } - std::sort(vt.begin(), vt.end()); - int size = static_cast(vt.size()); - vector procs; - for (int i = 0; i < size; ++i) { - string path = kZKPathApp + "/" + vt[i] + kZKPathJobProc; - if (!zk_.GetChild(path.c_str(), &procs)) continue; - JobInfo job; - string jid = vt[i].substr(vt[i].length()-10); - job.id = atoi(jid.c_str()); - job.procs = procs.size(); - jobs->push_back(job); - // may need to delete it - if (!job.procs && (i + kJobsNotRemoved < size)) - CleanPath(kZKPathApp + "/" + vt[i], true); - } - return true; -} - -bool JobManager::Remove(int job) { - string path = GetZKJobWorkspace(job) + kZKPathJobProc; - if (zk_.Exist(path.c_str())) { - return CleanPath(path.c_str(), false); - } - return true; -} - -bool JobManager::RemoveAllJobs() { - if (zk_.Exist(kZKPathApp.c_str())) { - return CleanPath(kZKPathApp.c_str(), false); - } - return true; -} - -bool JobManager::CleanUp() { - if (zk_.Exist(kZKPathSinga.c_str())) { - return CleanPath(kZKPathSinga.c_str(), true); - } - return true; -} - -bool JobManager::CleanPath(const string& path, bool remove) { - vector child; - if (!zk_.GetChild(path.c_str(), &child)) return false; - for (string c : child) { - if (!CleanPath(path + "/" + c, true)) return false; - } - if (remove) return zk_.DeleteNode(path.c_str()); - return true; -} - -// extract cluster configuration part from the job config file -// TODO(wangsh) improve this function to make it robust -string JobManager::ExtractClusterConf(const char* job_file) { - std::ifstream fin(job_file); - CHECK(fin.is_open()) << "cannot open job conf file " << job_file; - string line; - string cluster; - bool in_cluster = false; - while (!fin.eof()) { - std::getline(fin, line); - if (in_cluster == false) { - size_t pos = line.find("cluster"); - if (pos == std::string::npos) continue; - in_cluster = true; - line = line.substr(pos); - cluster = ""; - } - if (in_cluster == true) { - cluster += line + "\n"; - if (line.find("}") != std::string::npos) - in_cluster = false; - } - } - LOG(INFO) << "cluster configure: " << cluster; - size_t s_pos = cluster.find("{"); - size_t e_pos = cluster.find("}"); - if (s_pos == std::string::npos || e_pos == std::string::npos) { - LOG(FATAL) << "cannot extract valid cluster configuration in file: " - << job_file; - } - return cluster.substr(s_pos + 1, e_pos - s_pos-1); -} - } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/src/utils/job_manager.cc ---------------------------------------------------------------------- diff --git a/src/utils/job_manager.cc b/src/utils/job_manager.cc new file mode 100644 index 0000000..2ea5b1b --- /dev/null +++ b/src/utils/job_manager.cc @@ -0,0 +1,271 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#include "singa/utils/job_manager.h" + +#include +#include +#include +#include +#include +#include +#include "singa/proto/job.pb.h" + +using std::string; +using std::vector; + +namespace singa { + +JobManager::JobManager(const string& host) { + host_ = host; +} + +bool JobManager::Init() { +#ifdef USE_ZOOKEEPER + if (!zk_.Init(host_, timeout_)) return false; + if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr)) + return false; + if (!zk_.CreateNode(kZKPathSys.c_str(), nullptr, 0, nullptr)) + return false; + if (!zk_.CreateNode(kZKPathJLock.c_str(), nullptr, 0, nullptr)) + return false; + if (!zk_.CreateNode(kZKPathHostIdx.c_str(), "0", 0, nullptr)) + return false; + if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr)) + return false; +#endif + return true; +} + +bool JobManager::GenerateJobID(int* id) { +#ifdef USE_ZOOKEEPER + char buf[kZKBufSize]; + string lock = kZKPathJLock + "/lock-"; + if (!zk_.CreateNode(lock.c_str(), nullptr, + ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) { + return false; + } + *id = atoi(buf + strlen(buf) - 10); +#else + *id = 0; +#endif + return true; +} + +bool JobManager::GenerateHostList(const char* host_file, const char* job_file, + vector* list) { + int nprocs = 1; + list->clear(); + // compute required #process from job conf + if (job_file != nullptr) { + ClusterProto cluster; + google::protobuf::TextFormat::ParseFromString(ExtractClusterConf(job_file), + &cluster); + int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group() + / cluster.nworkers_per_procs(); + int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group() + / cluster.nservers_per_procs(); + if (cluster.server_worker_separate()) + nprocs = nworker_procs + nserver_procs; + else + nprocs = std::max(nworker_procs, nserver_procs); + } +#ifdef USE_ZOOKEEPER + // get available host list from global conf + std::ifstream hostfile(host_file); + if (!hostfile.is_open()) { + LOG(FATAL) << "Cannot open file: " << host_file; + } + vector hosts; + string host; + while (!hostfile.eof()) { + getline(hostfile, host); + if (!host.length() || host[0] == '#') continue; + hosts.push_back(host); + } + if (!hosts.size()) { + LOG(FATAL) << "Empty host file"; + } + // read next host index + char val[kZKBufSize]; + if (!zk_.GetNode(kZKPathHostIdx.c_str(), val)) return false; + int next = atoi(val); + // generate host list + for (int i = 0; i < nprocs; ++i) { + list->push_back(hosts[(next + i) % hosts.size()]); + } + // write next host index + next = (next + nprocs) % hosts.size(); + snprintf(val, kZKBufSize, "%d", next); + if (!zk_.UpdateNode(kZKPathHostIdx.c_str(), val)) return false; +#else + CHECK_EQ(nprocs, 1) << "To run multi-process job, please enable zookeeper"; + list->push_back("localhost"); +#endif + return true; +} + +bool JobManager::ListJobProcs(int job, vector* procs) { + procs->clear(); +#ifdef USE_ZOOKEEPER + string job_path = GetZKJobWorkspace(job); + // check job path + if (!zk_.Exist(job_path.c_str())) { + LOG(ERROR) << "job " << job << " not exists"; + return true; + } + string proc_path = job_path + kZKPathJobProc; + vector vt; + // check job proc path + if (!zk_.GetChild(proc_path.c_str(), &vt)) { + return false; + } + char buf[singa::kZKBufSize]; + for (string pname : vt) { + pname = proc_path + "/" + pname; + if (!zk_.GetNode(pname.c_str(), buf)) continue; + std::string proc = ""; + for (int i = 0; buf[i] != '\0'; ++i) { + if (buf[i] == ':') { + buf[i] = '\0'; + proc += buf; + } else if (buf[i] == '|') { + proc += buf + i; + } + } + procs->push_back(proc); + } + if (!procs->size()) LOG(ERROR) << "job " << job << " not exists"; +#endif + return true; +} + +bool JobManager::ListJobs(vector* jobs) { + jobs->clear(); +#ifdef USE_ZOOKEEPER + vector vt; + // get all children in app path + if (!zk_.GetChild(kZKPathApp.c_str(), &vt)) { + return false; + } + std::sort(vt.begin(), vt.end()); + int size = static_cast(vt.size()); + vector procs; + for (int i = 0; i < size; ++i) { + string path = kZKPathApp + "/" + vt[i] + kZKPathJobProc; + if (!zk_.GetChild(path.c_str(), &procs)) continue; + JobInfo job; + string jid = vt[i].substr(vt[i].length()-10); + job.id = atoi(jid.c_str()); + job.procs = procs.size(); + jobs->push_back(job); + // may need to delete it + if (!job.procs && (i + kJobsNotRemoved < size)) + CleanPath(kZKPathApp + "/" + vt[i], true); + } +#else + LOG(ERROR) << "Not supported without zookeeper"; +#endif + return true; +} + +bool JobManager::Remove(int job) { +#ifdef USE_ZOOKEEPER + string path = GetZKJobWorkspace(job) + kZKPathJobProc; + if (zk_.Exist(path.c_str())) { + return CleanPath(path.c_str(), false); + } +#else + LOG(ERROR) << "Not supported without zookeeper"; +#endif + return true; +} + +bool JobManager::RemoveAllJobs() { +#ifdef USE_ZOOKEEPER + if (zk_.Exist(kZKPathApp.c_str())) { + return CleanPath(kZKPathApp.c_str(), false); + } +#else + LOG(ERROR) << "Not supported without zookeeper"; +#endif + return true; +} + +bool JobManager::CleanUp() { +#ifdef USE_ZOOKEEPER + if (zk_.Exist(kZKPathSinga.c_str())) { + return CleanPath(kZKPathSinga.c_str(), true); + } +#else + LOG(ERROR) << "Not supported without zookeeper"; +#endif + return true; +} + +bool JobManager::CleanPath(const string& path, bool remove) { +#ifdef USE_ZOOKEEPER + vector child; + if (!zk_.GetChild(path.c_str(), &child)) return false; + for (string c : child) { + if (!CleanPath(path + "/" + c, true)) return false; + } + if (remove) return zk_.DeleteNode(path.c_str()); +#else + LOG(ERROR) << "Not supported without zookeeper"; +#endif + return true; +} + +// extract cluster configuration part from the job config file +// TODO(wangsh) improve this function to make it robust +string JobManager::ExtractClusterConf(const char* job_file) { + std::ifstream fin(job_file); + CHECK(fin.is_open()) << "cannot open job conf file " << job_file; + string line; + string cluster; + bool in_cluster = false; + while (!fin.eof()) { + std::getline(fin, line); + if (in_cluster == false) { + size_t pos = line.find("cluster"); + if (pos == std::string::npos) continue; + in_cluster = true; + line = line.substr(pos); + cluster = ""; + } + if (in_cluster == true) { + cluster += line + "\n"; + if (line.find("}") != std::string::npos) + in_cluster = false; + } + } + LOG(INFO) << "cluster configure: " << cluster; + size_t s_pos = cluster.find("{"); + size_t e_pos = cluster.find("}"); + if (s_pos == std::string::npos || e_pos == std::string::npos) { + LOG(FATAL) << "cannot extract valid cluster configuration in file: " + << job_file; + } + return cluster.substr(s_pos + 1, e_pos - s_pos-1); +} + +} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/src/utils/tool.cc ---------------------------------------------------------------------- diff --git a/src/utils/tool.cc b/src/utils/tool.cc index 4b50214..3b1df72 100644 --- a/src/utils/tool.cc +++ b/src/utils/tool.cc @@ -24,8 +24,8 @@ #include #include #include "singa/proto/singa.pb.h" -#include "singa/utils/cluster_rt.h" #include "singa/utils/common.h" +#include "singa/utils/job_manager.h" std::string conf_dir; singa::SingaProto global; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/adeced6b/src/utils/zk_service.cc ---------------------------------------------------------------------- diff --git a/src/utils/zk_service.cc b/src/utils/zk_service.cc new file mode 100644 index 0000000..352f6f7 --- /dev/null +++ b/src/utils/zk_service.cc @@ -0,0 +1,326 @@ +/************************************************************ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*************************************************************/ + +#include "singa/utils/zk_service.h" + +#include +#include + +using std::string; +using std::to_string; +using std::vector; + +namespace singa { + +void ZKService::ChildChanges(zhandle_t *zh, int type, int state, + const char *path, void *watcherCtx) { + // check if already callback + RTCallback *cb = static_cast(watcherCtx); + if (cb->fn == nullptr) return; + if (type == ZOO_CHILD_EVENT) { + struct String_vector child; + // check the child list and put another watcher + int ret = zoo_wget_children(zh, path, ChildChanges, watcherCtx, &child); + if (ret == ZOK) { + if (child.count == 0) { + LOG(INFO) << "child.count = 0 in path: " << path; + // all workers leave, we do callback now + (*cb->fn)(cb->ctx); + cb->fn = nullptr; + } + } else { + LOG(FATAL) << "Unhandled ZK error code: " << ret + << " (zoo_wget_children " << path << ")"; + } + } else { + LOG(FATAL) << "Unhandled callback type code: "<< type; + } +} + +ZKService::~ZKService() { + // close zookeeper handler + zookeeper_close(zkhandle_); +} + +char zk_cxt[] = "ZKClusterRT"; + +bool ZKService::Init(const string& host, int timeout) { + zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); + zkhandle_ = zookeeper_init(host.c_str(), WatcherGlobal, timeout, 0, + static_cast(zk_cxt), 0); + if (zkhandle_ == NULL) { + LOG(ERROR) << "Error when connecting to zookeeper servers..."; + LOG(ERROR) << "Please ensure zookeeper service is up in host(s):"; + LOG(ERROR) << host.c_str(); + return false; + } + + return true; +} + +bool ZKService::CreateNode(const char* path, const char* val, int flag, + char* output) { + CHECK(zkhandle_) << "zk handler not initialized"; + char buf[kZKBufSize]; + int ret = 0; + // send the zk request + for (int i = 0; i < kNumRetry; ++i) { + ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val), + &ZOO_OPEN_ACL_UNSAFE, flag, buf, kZKBufSize); + if (ret == ZNONODE) { + LOG(WARNING) << "zookeeper parent node of " << path + << " not exist, retry later"; + } else if (ret == ZCONNECTIONLOSS) { + LOG(WARNING) << "zookeeper disconnected, retry later"; + } else { + break; + } + sleep(kSleepSec); + } + // copy the node name to output + if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) { + snprintf(output, kZKBufSize, "%s", buf); + // use snprintf instead of strcpy + // strcpy(output, buf); + } + if (ret == ZOK) { + LOG(INFO) << "created zookeeper node " << buf + << " (" << (val == nullptr ? "NULL" : val) << ")"; + return true; + } else if (ret == ZNODEEXISTS) { + LOG(WARNING) << "zookeeper node " << path << " already exists"; + return true; + } else if (ret == ZCONNECTIONLOSS) { + LOG(ERROR) << "Cannot connect to zookeeper, " + << "please ensure it is running properly...\n" + << "If want to use zookeeper in our thirdparty folder, " + << "you can start it by:\n" + << "$ ./bin/zk-service.sh start"; + return false; + } + LOG(FATAL) << "Unhandled ZK error code: " << ret + << " (zoo_create " << path << ")"; + return false; +} + +bool ZKService::DeleteNode(const char* path) { + CHECK(zkhandle_) << "zk handler not initialized"; + int ret = zoo_delete(zkhandle_, path, -1); + if (ret == ZOK) { + LOG(INFO) << "deleted zookeeper node " << path; + return true; + } else if (ret == ZNONODE) { + LOG(WARNING) << "try to delete an non-existing zookeeper node " << path; + return true; + } + LOG(FATAL) << "Unhandled ZK error code: " << ret + << " (zoo_delete " << path << ")"; + return false; +} + +bool ZKService::Exist(const char* path) { + CHECK(zkhandle_) << "zk handler not initialized"; + struct Stat stat; + int ret = zoo_exists(zkhandle_, path, 0, &stat); + if (ret == ZOK) return true; + else if (ret == ZNONODE) return false; + LOG(WARNING) << "Unhandled ZK error code: " << ret << " (zoo_exists)"; + return false; +} + +bool ZKService::UpdateNode(const char* path, const char* val) { + CHECK(zkhandle_) << "zk handler not initialized"; + // set version = -1, do not check content version + int ret = zoo_set(zkhandle_, path, val, strlen(val), -1); + if (ret == ZOK) { + return true; + } else if (ret == ZNONODE) { + LOG(ERROR) << "zk node " << path << " does not exist"; + return false; + } + LOG(FATAL) << "Unhandled ZK error code: " << ret + << " (zoo_get " << path << ")"; + return false; +} + +bool ZKService::GetNode(const char* path, char* output) { + CHECK(zkhandle_) << "zk handler not initialized"; + struct Stat stat; + int val_len = kZKBufSize; + int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat); + if (ret == ZOK) { + output[val_len] = '\0'; + return true; + } else if (ret == ZNONODE) { + LOG(ERROR) << "zk node " << path << " does not exist"; + return false; + } + LOG(FATAL) << "Unhandled ZK error code: " << ret + << " (zoo_get " << path << ")"; + return false; +} + +bool ZKService::GetChild(const char* path, vector* vt) { + CHECK(zkhandle_) << "zk handler not initialized"; + struct String_vector child; + int ret = zoo_get_children(zkhandle_, path, 0, &child); + if (ret == ZOK) { + vt->clear(); + for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]); + return true; + } + LOG(FATAL) << "Unhandled ZK error code: " << ret + << " (zoo_get_children " << path << ")"; + return false; +} + +bool ZKService::WGetChild(const char* path, vector* vt, + RTCallback *cb) { + CHECK(zkhandle_) << "zk handler not initialized"; + struct String_vector child; + int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child); + if (ret == ZOK) { + vt->clear(); + for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]); + return true; + } + LOG(FATAL) << "Unhandled ZK error code: " << ret + << " (zoo_get_children " << path << ")"; + return false; +} + + +void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state, + const char *path, void *watcherCtx) { + if (type == ZOO_SESSION_EVENT) { + if (state == ZOO_CONNECTED_STATE) + LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!"; + else if (state == ZOO_EXPIRED_SESSION_STATE) + LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!"; + } +} + +ZKClusterRT::ZKClusterRT(const string& host, int job_id) { + host_ = host; + workspace_ = GetZKJobWorkspace(job_id); + group_path_ = workspace_ + kZKPathJobGroup; + proc_path_ = workspace_ + kZKPathJobProc; + proc_lock_path_ = workspace_ + kZKPathJobPLock; +} + +ZKClusterRT::~ZKClusterRT() { + // release callback vector + for (RTCallback* p : cb_vec_) { + delete p; + } +} + +bool ZKClusterRT::Init() { + if (!zk_.Init(host_, timeout_)) return false; + if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr)) + return false; + if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr)) + return false; + if (!zk_.CreateNode(workspace_.c_str(), nullptr, 0, nullptr)) + return false; + if (!zk_.CreateNode(group_path_.c_str(), nullptr, 0, nullptr)) + return false; + if (!zk_.CreateNode(proc_path_.c_str(), nullptr, 0, nullptr)) + return false; + if (!zk_.CreateNode(proc_lock_path_.c_str(), nullptr, 0, nullptr)) + return false; + return true; +} + +int ZKClusterRT::RegistProc(const string& host_addr, int pid) { + char buf[kZKBufSize]; + string lock = proc_lock_path_ + "/lock-"; + if (!zk_.CreateNode(lock.c_str(), nullptr, + ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) { + return -1; + } + // get all children in lock folder + vector vt; + if (!zk_.GetChild(proc_lock_path_.c_str(), &vt)) { + return -1; + } + // find own position among all locks + int id = -1; + std::sort(vt.begin(), vt.end()); + for (int i = 0; i < static_cast(vt.size()); ++i) { + if (proc_lock_path_+"/"+vt[i] == buf) { + id = i; + break; + } + } + if (id == -1) { + LOG(ERROR) << "cannot find own node " << buf; + return -1; + } + // create a new node in proc path + string path = proc_path_ + "/proc-" + to_string(id); + string content = host_addr + "|" + to_string(pid); + if (!zk_.CreateNode(path.c_str(), content.c_str(), ZOO_EPHEMERAL, + nullptr)) { + return -1; + } + return id; +} + +std::string ZKClusterRT::GetProcHost(int proc_id) { + char val[kZKBufSize]; + // construct file name + string path = proc_path_ + "/proc-" + to_string(proc_id); + if (!zk_.GetNode(path.c_str(), val)) return ""; + int len = strlen(val) - 1; + while (len && val[len] != '|') --len; + CHECK(len); + val[len] = '\0'; + return string(val); +} + +bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) { + CHECK_NOTNULL(fn); + string path = groupPath(gid); + // create zk node + if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false; + vector child; + // store the callback function and context for later usage + RTCallback *cb = new RTCallback; + cb->fn = fn; + cb->ctx = ctx; + cb_vec_.push_back(cb); + // start to watch on the zk node, does not care about the first return value + return zk_.WGetChild(path.c_str(), &child, cb); +} + +bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) { + string path = groupPath(s_group) + workerPath(gid, wid); + // try to create an ephemeral node under server group path + return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr); +} + +bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) { + string path = groupPath(s_group) + workerPath(gid, wid); + return zk_.DeleteNode(path.c_str()); +} + +} // namespace singa