singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [2/3] incubator-singa git commit: SINGA-19 Slice large Param objects for load-balance
Date Tue, 23 Jun 2015 14:14:04 GMT
SINGA-19 Slice large Param objects for load-balance

rebased to latested master and tested with multiple servers


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/c635cc61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/c635cc61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/c635cc61

Branch: refs/heads/master
Commit: c635cc61e82b7419ad5aefd7dd643d037cd25737
Parents: e0a52a6
Author: wang wei <wangwei@comp.nus.edu.sg>
Authored: Tue Jun 23 20:09:33 2015 +0800
Committer: wang wei <wangwei@comp.nus.edu.sg>
Committed: Tue Jun 23 20:24:33 2015 +0800

----------------------------------------------------------------------
 include/communication/msg.h | 17 ++++++-----
 src/communication/msg.cc    |  8 ++---
 src/trainer/server.cc       | 31 +++++++++++---------
 src/trainer/trainer.cc      | 63 +++++++++++++++++++++++++---------------
 src/trainer/worker.cc       |  8 ++---
 5 files changed, 76 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c635cc61/include/communication/msg.h
----------------------------------------------------------------------
diff --git a/include/communication/msg.h b/include/communication/msg.h
index b83c738..e63c3cf 100644
--- a/include/communication/msg.h
+++ b/include/communication/msg.h
@@ -39,12 +39,14 @@ class Msg {
   inline void SwapAddr() { std::swap(src_, dst_); }
   inline void set_type(int type) { type_ = type; }
   inline int type() const { return type_; }
-  inline void set_target(int first, int second) {
-    target_first_ = first;
-    target_second_ = second;
+  inline void set_trgt(int first, int second, int third) {
+    trgt_first_ = first;
+    trgt_second_ = second;
+    trgt_third_ = third;
   }
-  inline int target_first() const { return target_first_; }
-  inline int target_second() const { return target_second_; }
+  inline int trgt_first() const { return trgt_first_; }
+  inline int trgt_second() const { return trgt_second_; }
+  inline int trgt_third() const { return trgt_third_; }
  /**
    * Copy src and dst address, including first, id, flag
    */
@@ -84,8 +86,9 @@ class Msg {
   int src_ = 0;
   int dst_ = 0;
   int type_ = 0;
-  int target_first_ = 0;
-  int target_second_ = 0;
+  int trgt_first_ = 0;
+  int trgt_second_ = 0;
+  int trgt_third_ = 0;
 #ifdef USE_ZMQ
   zmsg_t* msg_ = nullptr;
   zframe_t *frame_ = nullptr;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c635cc61/src/communication/msg.cc
----------------------------------------------------------------------
diff --git a/src/communication/msg.cc b/src/communication/msg.cc
index 2a22b05..4f41077 100644
--- a/src/communication/msg.cc
+++ b/src/communication/msg.cc
@@ -31,15 +31,15 @@ bool Msg::next_frame() {
 
 void Msg::ParseFromZmsg(zmsg_t* msg) {
   char* tmp = zmsg_popstr(msg);
-  sscanf(tmp, "%d %d %d %d %d",
-         &src_, &dst_, &type_, &target_first_, &target_second_);
+  sscanf(tmp, "%d %d %d %d %d %d",
+         &src_, &dst_, &type_, &trgt_first_, &trgt_second_, &trgt_third_);
   frame_ = zmsg_next(msg);
   msg_ = msg;
 }
 
 zmsg_t* Msg::DumpToZmsg() {
-  zmsg_pushstrf(msg_, "%d %d %d %d %d",
-                src_, dst_, type_, target_first_, target_second_);
+  zmsg_pushstrf(msg_, "%d %d %d %d %d %d",
+      src_, dst_, type_, trgt_first_, trgt_second_, trgt_third_);
   zmsg_t *tmp = msg_;
   msg_ = nullptr;
   return tmp;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c635cc61/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 36b04b6..5662258 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -50,7 +50,7 @@ void Server::Run(){
       CHECK_STREQ("PONG", pong.c_str());
       delete msg;
     }else if(type==kPut){
-      int pid=msg->target_first();
+      int pid=msg->trgt_second();
       shared_ptr<Param> param=nullptr;
       if(shard_->find(pid)!=shard_->end()){
         LOG(ERROR)<<"Param ("<<pid<<") is put more than once";
@@ -61,16 +61,14 @@ void Server::Run(){
         param->set_id(pid);
         (*shard_)[pid]=param;
       }
-      param->HandlePutMsg(&msg);
+      HandlePut(param, &msg);
     }else{
-      int pid=msg->target_first();
+      int pid=msg->trgt_second();
       if(shard_->find(pid)==shard_->end()){
         // delay the processing by re-queue the msg.
         response=msg;
+        DLOG(ERROR)<<"Requeue msg";
       } else{
-        CHECK(shard_->find(pid)!=shard_->end()) <<"Param ("<<pid
-          <<") is not maintained by server ("
-          <<group_id_ <<", " <<server_id_<<")";
         auto param=shard_->at(pid);
         switch (type){
           case kGet:
@@ -80,7 +78,6 @@ void Server::Run(){
             response = HandleUpdate(param, &msg);
             break;
           case kSyncRequest:
-            VLOG(3)<<"Handle SYNC-REQUEST";
             response = HandleSyncRequest(param, &msg);
             break;
         }
@@ -93,25 +90,33 @@ void Server::Run(){
 }
 
 void Server::HandlePut(shared_ptr<Param> param, Msg **msg){
+  int version=(*msg)->trgt_third();
   param->HandlePutMsg(msg);
+  // must set version after HandlePutMsg which allocates the memory
+  param->set_version(version);
 }
 
 Msg* Server::HandleGet(shared_ptr<Param> param, Msg **msg){
-  return param->HandleGetMsg(msg);
+  if(param->version()<(*msg)->trgt_third())
+    return *msg;
+  else{
+    auto reply= param->HandleGetMsg(msg);
+    int paramid=reply->trgt_first(), slice=reply->trgt_second();
+    reply->set_trgt(paramid, slice, param->version());
+  }
 }
 
 Msg* Server::HandleUpdate(shared_ptr<Param> param, Msg **msg) {
-  //repsonse of the format: <identity><type: kData><paramId><param content>
   auto* tmp=static_cast<Msg*>((*msg)->CopyAddr());
   tmp->SwapAddr();
-  int paramid=(*msg)->target_first();
-  int sliceid=(*msg)->target_second();
-  int step=(*msg)->target_third();
+  int paramid=(*msg)->trgt_first();
+  int sliceid=(*msg)->trgt_second();
+  int step=(*msg)->trgt_third();
   bool copy=param->ParseUpdateMsg(msg);
   updater_->Update(step, param);
   param->set_version(param->version()+1);
   auto response=param->GenUpdateResponseMsg(copy);
-  response->set_target(paramid, sliceid, param->version());
+  response->set_trgt(paramid, sliceid, param->version());
   response->SetAddr(tmp);
   delete tmp;
   return response;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c635cc61/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 989a020..11499db 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -37,11 +37,11 @@ const std::unordered_map<int, vector<std::pair<int, int>>>
SliceParams(int num,
     if(x->owner()==x->id())
       avg+=x->size();
   }
-  avg/=num;
+  avg=avg/num+avg%num;
   int diff=avg/10;
   LOG(INFO)<<"Slicer, param avg="<<avg<<", diff= "<<diff;
 
-  int capacity=avg, sliceid=0;
+  int capacity=avg, sliceid=0, nbox=0;
   std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices;
   for(auto& param: params){
     if(param->id()!=param->owner())
@@ -50,20 +50,22 @@ const std::unordered_map<int, vector<std::pair<int, int>>>
SliceParams(int num,
     LOG(INFO)<<"param id="<<paramid<<", total size="<<x;
     while(x>0){
       int size=0;
-      if(capacity>x){
+      if(capacity>=x){
         capacity-=x;
         size=x;
         x=0;
-      }else if(capacity+diff>x){
-        capacity=avg;
+      }else if(capacity+diff>=x){
         size=x;
         x=0;
-      }else if(capacity>diff){
+        capacity=0;
+      }else if(capacity>=diff){
         x-=capacity;
         size=capacity;
         capacity=avg;
+        nbox++;
       }else{
         capacity=avg;
+        nbox++;
       }
       if(size){
         paramid2slices[paramid].push_back(std::make_pair(sliceid++, size));
@@ -71,29 +73,42 @@ const std::unordered_map<int, vector<std::pair<int, int>>>
SliceParams(int num,
       }
     }
   }
+  CHECK_LE(nbox, num);
   return paramid2slices;
 }
 const vector<int> PartitionSlice(int num, const vector<int>& slices){
   int avg=0;
   for(int x: slices)
     avg+=x;
-  avg/=num;
+  avg=avg/num+avg%num;
   int box=avg, boxid=0, diff=avg/10;
   vector<int> slice2box;
-  for(int x: slices){
+  for(auto it=slices.begin(); it!=slices.end();){
+    int x=*it;
     if(box>=x){
       box-=x;
       slice2box.push_back(boxid);
+      it++;
     }else if(box+diff>=x){
       slice2box.push_back(boxid);
-      box=avg;
-      boxid++;
+      it++;
+      box=0;
     }else{
       box=avg;
       boxid++;
     }
   }
-  CHECK_LE(boxid, num);
+//  CHECK_LT(slice2box.back(), num);
+  CHECK_EQ(slice2box.size(), slices.size());
+  int previd=slice2box[0];
+  std::string disp;
+  for(size_t i=0;i<slice2box.size();i++)
+    if(previd!=slice2box[i]){
+      disp+=", "+std::to_string(slices[i]);
+      previd=slice2box[i];
+    } else
+      disp+=" "+std::to_string(slices[i]);
+  LOG(INFO)<<"partition slice (av ="<<avg<<", num="<<num<<"):"<<disp;
   return slice2box;
 }
 vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads,
@@ -298,7 +313,7 @@ void Trainer::Run(int nworkers, int nservers){
           }
         }else if(type==kMetric){
           if(msg->src_first()==0){
-            int step=msg->target_first();
+            int step=msg->trgt_first();
             string prefix((char*)msg->frame_data(), msg->frame_size());
             msg->next_frame();
             Metric cur;
@@ -308,7 +323,7 @@ void Trainer::Run(int nworkers, int nservers){
           DeleteMsg(&msg);
         }else if(cluster->nserver_groups()>0){
           int group_id;
-          int paramid=msg->target_first();
+          int paramid=msg->trgt_first();
           shared_ptr<ParamInfo> entry;
           switch (type){ // TODO process other requests, e.g. RESTful
             case kUpdate:
@@ -378,7 +393,7 @@ Msg* Trainer::HandleConnect(Msg** msg){
 const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo> pi, Msg** msg){
   Msg* msgg=*msg;
   vector<Msg*> replies;
-  int version=msgg->target_second();
+  int version=msgg->trgt_third();
   if(msgg->src_flag()==kStub){
     if(version<=pi->shares.at(0)->version()){
       pi->shares.at(0)->HandleGetMsg(msg);
@@ -394,7 +409,7 @@ const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo>
pi, Msg** msg){
       int server=slice2server_[id+idx];
       int procs=Cluster::Get()->ProcsIDOf(group, server, kServer);
       auto x=param->GenGetMsg(procs!=procs_id_, idx);
-      x->set_target(param->owner(), id+idx, param->local_version()+1);
+      x->set_trgt(param->owner(), id+idx, param->local_version()+1);
       x->set_src(procs_id_, gid, kStub);
       x->set_dst(group, server, kServer);
       replies.push_back(x);
@@ -406,7 +421,7 @@ const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo>
pi, Msg** msg){
 const vector<Msg*> Trainer::HandleUpdate(shared_ptr<ParamInfo>pi, Msg** msg){
   Msg* msgg=*msg ;
   vector<Msg*> ret;
-  int step= msgg->target_second();
+  int step= msgg->trgt_third();
   if(msgg->src_flag()==kStub){
     if(pi->num_update<pi->num_local){
       ret.push_back(*msg);
@@ -448,7 +463,7 @@ const vector<Msg*> Trainer::HandleUpdate(shared_ptr<ParamInfo>pi,
Msg** msg){
       int server=slice2server_[idx+id];
       int procs=Cluster::Get()->ProcsIDOf(group, server, kServer);
       auto x=param->GenUpdateMsg(procs!=procs_id_, idx);
-      x->set_target(param->owner(), id+idx, step);
+      x->set_trgt(param->owner(), id+idx, step);
       x->set_src(procs_id_, srcgid, kStub);
       x->set_dst(group, server, kServer);
       ret.push_back(x);
@@ -463,13 +478,14 @@ const vector<Msg*> Trainer::HandlePut(shared_ptr<ParamInfo>pi,
Msg** msg){
   vector<Msg*> ret;
   CHECK_NE((*msg)->src_flag(), kStub);
   int gid=(*msg)->src_first();
+  int version=(*msg)->trgt_third();
   auto param=pi->shares.at(0);
   int group=gid/Cluster::Get()->nworker_groups_per_server_group();
   for(int idx=0, start=param->slice_start();idx<param->num_slices(); idx++){
     int server=slice2server_[start+idx];
     int procs=Cluster::Get()->ProcsIDOf(group, server, kServer);
     auto x=param->GenPutMsg(procs!=procs_id_, idx);
-    x->set_target(param->owner(), start+idx, param->version());
+    x->set_trgt(param->owner(), start+idx, version);
     x->set_src(procs_id_, gid, kStub);
     x->set_dst(group, server, kServer);
     ret.push_back(x);
@@ -479,8 +495,8 @@ const vector<Msg*> Trainer::HandlePut(shared_ptr<ParamInfo>pi,
Msg** msg){
 }
 
 void Trainer::HandleGetResponse(shared_ptr<ParamInfo>pi, Msg** msg){
-  int version=(*msg)->target_third();
-  int sliceid=(*msg)->target_second();
+  int version=(*msg)->trgt_third();
+  int sliceid=(*msg)->trgt_second();
   auto param=pi->shares.at(0);
   if(param->ParseGetResponseMsg(msg,sliceid-param->slice_start()))
     param->set_version(version);
@@ -489,10 +505,11 @@ void Trainer::HandleGetResponse(shared_ptr<ParamInfo>pi, Msg**
msg){
 
 
 void Trainer::HandleUpdateResponse(shared_ptr<ParamInfo> pi, Msg** msg){
-  int version=(*msg)->target_third();
-  int sliceid=(*msg)->target_second();
+  int sliceid=(*msg)->trgt_second();
+  int version=(*msg)->trgt_third();
   auto param=pi->shares.at(0);
-  if(param->ParseUpdateResponseMsg(msg,sliceid-param->slice_start()))
+  if(param->ParseUpdateResponseMsg(msg,sliceid-param->slice_start())){
     param->set_version(version);
+  }
 }
 } /* singa */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c635cc61/src/trainer/worker.cc
----------------------------------------------------------------------
diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc
index 0835bbb..8ecb146 100644
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@ -103,7 +103,7 @@ int Worker::Put(shared_ptr<Param> param, int step){
   msg->set_src(group_id_, worker_id_, kWorkerParam);
   msg->set_dst(-1, -1, kStub);
   msg->set_type(kPut);
-  msg->set_target(param->owner(), step);
+  msg->set_trgt(param->owner(), 0, step);
   dealer_->Send(&msg);
   return 1;
 }
@@ -112,7 +112,7 @@ int Worker::Get(shared_ptr<Param> param, int step){
   msg->set_src(group_id_, worker_id_, kWorkerParam);
   msg->set_dst(-1, -1, kStub);
   msg->set_type(kGet);
-  msg->set_target(param->owner(), step);
+  msg->set_trgt(param->owner(), 0, step);
   dealer_->Send(&msg);
   return 1;
 }
@@ -126,7 +126,7 @@ int Worker::Update(shared_ptr<Param> param, int step){
     msg->set_src(group_id_, worker_id_, kWorkerParam);
     msg->set_dst(-1, -1, kStub);
     msg->set_type(kUpdate);
-    msg->set_target(param->owner(), step);
+    msg->set_trgt(param->owner(), 0, step);
     dealer_->Send(&msg);
   }
   return 1;
@@ -153,7 +153,7 @@ const void Worker::DisplayPerformance(const Metric & perf, const string&
prefix)
   msg->set_src(group_id_, worker_id_, kWorkerParam);
   msg->set_dst(-1,-1, kStub);
   msg->set_type(kMetric);
-  msg->set_target(step_,0);
+  msg->set_trgt(step_,0,0);
   const string disp=perf.ToString();
   msg->add_frame(prefix.c_str(), prefix.length());
   msg->add_frame(disp.c_str(), disp.length());


Mime
View raw message