singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wang...@apache.org
Subject [03/22] incubator-singa git commit: Add cluster runtime component implementation based on zookeeper
Date Wed, 27 May 2015 14:39:04 GMT
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/e2b8abe2/src/test/test_cluster.cc
----------------------------------------------------------------------
diff --git a/src/test/test_cluster.cc b/src/test/test_cluster.cc
index fc49e53..bb17149 100644
--- a/src/test/test_cluster.cc
+++ b/src/test/test_cluster.cc
@@ -7,8 +7,22 @@ using namespace singa;
 
 string folder="src/test/data/";
 
-TEST(CluserRuntimeTest, Basic){
-  ClusterRuntime* rt = new ZKClusterRT("localhost:2181");
+string host="localhost:2181";
+
+void zk_cb(void *contest){
+  LOG(INFO) << "zk callback: " << (char *)contest;
+}
+
+TEST(CluserRuntimeTest, ZooKeeper){
+  ClusterRuntime* rt = new ZKClusterRT(host);
+  ASSERT_EQ(rt->Init(), true);
+  ASSERT_EQ(rt->sWatchSGroup(1, 1, zk_cb, "test call back"), true);
+  ASSERT_EQ(rt->wJoinSGroup(1, 1, 1), true);
+  ASSERT_EQ(rt->wJoinSGroup(1, 2, 1), true);
+  ASSERT_EQ(rt->wLeaveSGroup(1, 2, 1), true);
+  ASSERT_EQ(rt->wLeaveSGroup(1, 1, 1), true);
+  
+  sleep(3);
   delete rt;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/e2b8abe2/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index f99d4f5..d075648 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -1,32 +1,153 @@
+#include <zookeeper/zookeeper_log.h>
 #include "utils/cluster_rt.h"
 
+using std::to_string;
+
 namespace singa {
 
 /********* Implementation for ZKClusterRT **************/
 
-
-ZKClusterRT::ZKClusterRT(string host){
-  //fprintf(stderr, "Create ZKClusterRT");
+ZKClusterRT::ZKClusterRT(string host, int timeout){
+  host_ = host;
+  timeout_ = timeout;
+  zkhandle_ = nullptr;
 }
 
 ZKClusterRT::~ZKClusterRT(){
-  //fprintf(stderr, "Destroy ZKClusterRT");
+  zookeeper_close(zkhandle_);
 }
 
 bool ZKClusterRT::Init(){
-  return false;
+
+  zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
+
+  zkhandle_ = zookeeper_init(host_.c_str(), watcherGlobal, timeout_, 0, "ZKClusterRT", 0);
+
+  if (zkhandle_ == nullptr){
+    LOG(ERROR) << "Error when connecting to zookeeper servers...";
+    LOG(ERROR) <<"Please ensure zookeeper service is up in host(s):";
+    LOG(ERROR) << host_.c_str();
+    return false;
+  }
+
+  return true;
 }
 
-bool ZKClusterRT::sWatchSGroup(int gid, int sid){
-  return false;
+bool ZKClusterRT::sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx){
+ 
+  string path = getSGroupPath(gid);
+  struct Stat stat;
+
+  //check existance of zk node
+  int ret = zoo_exists(zkhandle_, path.c_str(), 0, &stat);
+  //if have, pass
+  if (ret == ZOK) ;
+  //need to create zk node first
+  else if (ret == ZNONODE){
+    char buf[MAX_BUF_LEN]; 
+    ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, buf,
MAX_BUF_LEN);
+    if (ret == ZOK){
+      LOG(INFO) << "zookeeper node " << buf << " created";
+    }
+    else{
+      LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
+      return false;
+    }
+  }
+  else{
+    LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
+    return false;
+  }
+
+  struct String_vector child;
+  //store the callback function and context for later usage
+  RTCallback *cb = new RTCallback;
+  cb->fn = fn;
+  cb->ctx = ctx;
+  cb_vec_.push_back(cb);
+  //start to watch on the zk node, does not care about the first return value
+  zoo_wget_children(zkhandle_, path.c_str(), childChanges, cb, &child);
+
+  return true;
 }
 
 bool ZKClusterRT::wJoinSGroup(int gid, int wid, int s_group){
+  
+  string path = getSGroupPath(s_group) + getWorkerPath(gid, wid);
+  char buf[MAX_BUF_LEN]; 
+  
+  int ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL,
buf, MAX_BUF_LEN);
+  if (ret == ZOK){
+    LOG(INFO) << "zookeeper node " << buf << " created";
+    return true;
+  }
+  else if (ret == ZNODEEXISTS){
+    LOG(WARNING) << "zookeeper node " << path << " already exist";
+    return true;
+  }
+  else if (ret == ZNONODE){
+    LOG(ERROR) << "zookeeper parent node " << getSGroupPath(s_group) <<
" not exist";
+    return false;
+  }
+  
+  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
   return false;
 }
 
 bool ZKClusterRT::wLeaveSGroup(int gid, int wid, int s_group){
+  
+  string path = getSGroupPath(s_group) + getWorkerPath(gid, wid);
+  
+  int ret = zoo_delete(zkhandle_, path.c_str(), -1);
+  if (ret == ZOK){
+    LOG(INFO) << "zookeeper node " << path << " deleted";
+    return true;
+  }
+  else if (ret == ZNONODE){
+    LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
+    return true;
+  }
+
+  LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_delete)";
   return false;
 }
 
+void ZKClusterRT::watcherGlobal(zhandle_t * zh, int type, int state, const char *path, void
*watcherCtx){
+  if (type == ZOO_SESSION_EVENT){
+    if (state == ZOO_CONNECTED_STATE)
+      LOG(INFO) << "Connected to zookeeper service successfully!";
+    else if (state == ZOO_EXPIRED_SESSION_STATE)
+      LOG(INFO) << "zookeeper session expired!";
+  }
+}
+
+void ZKClusterRT::childChanges(zhandle_t *zh, int type, int state, const char *path, void
*watcherCtx){
+
+  //check if already callback
+  RTCallback *cb = (RTCallback *)watcherCtx;
+  if (cb->fn == nullptr) return;
+
+  struct String_vector child;
+  //check the child list and put another watcher
+  int ret = zoo_wget_children(zh, path, childChanges, watcherCtx, &child);
+  LOG(INFO) << "ret = " << ret;
+  if (ret == ZOK){
+    LOG(INFO) << "child.count = " << child.count;
+    if (child.count == 0){
+      //all workers leave, we do callback now
+      (*cb->fn)(cb->ctx);
+      cb->fn = nullptr;
+    }
+  }
+}
+
+string ZKClusterRT::getSGroupPath(int gid){
+  //return "/singa/status";
+  return "/singa/status/sg"+to_string(gid);
+}
+
+string ZKClusterRT::getWorkerPath(int gid, int wid){
+  return "/g"+to_string(gid)+"_w"+to_string(wid);
+}
+
 } // namespace singa


Mime
View raw message