singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [5/6] incubator-singa git commit: SINGA-8 Implement distributed Hogwild
Date Thu, 25 Jun 2015 13:45:03 GMT
SINGA-8 Implement distributed Hogwild

Fixbug from parameter synchronization among server groups.
Interprocs dealer cannot send messages to other process if the endpoint is hostname, e.g.,
"blob-pc".
Replaced hostname to host IP in binding/connecting endpoint. But the GetHostIP method is specific
to linux OS.
Another issue is the synchronization frequency. Currently, the stub will trigger one sync
reminder every time
its poller expires. If the expire time is large, then the reminder would seldomly be triggered.
If it is small,
many reminder messages will be trigger. TODO tune the trigger.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/4956d6a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/4956d6a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/4956d6a0

Branch: refs/heads/master
Commit: 4956d6a031de16811e4585b9c28b9ab29c33ab76
Parents: 884b9d7
Author: wang wei <wangwei@comp.nus.edu.sg>
Authored: Thu Jun 25 20:53:51 2015 +0800
Committer: wang wei <wangwei@comp.nus.edu.sg>
Committed: Thu Jun 25 20:53:51 2015 +0800

----------------------------------------------------------------------
 include/utils/cluster.h |  7 ++++---
 include/utils/common.h  |  1 +
 src/trainer/server.cc   | 10 ++++++++--
 src/trainer/trainer.cc  | 12 ++++++++++--
 src/utils/cluster.cc    |  4 +---
 src/utils/common.cc     | 32 ++++++++++++++++++++++++++++++++
 src/utils/param.cc      | 22 +++++++++++++---------
 7 files changed, 69 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 55b10a9..e5980ca 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -6,6 +6,7 @@
 #include <memory>
 #include <vector>
 #include <unordered_map>
+#include "utils/common.h"
 #include "proto/cluster.pb.h"
 #include "utils/cluster_rt.h"
 
@@ -123,8 +124,8 @@ class Cluster {
   }
 
   int ProcsIDOf(int group_id, int id, int flag);
-  const string hostname() const {
-    return hostname_;
+  const string hostip() const {
+    return hostip_;
   }
   void Register(const string& endpoint);
 
@@ -136,7 +137,7 @@ class Cluster {
  private:
   int procs_id_;
   int nprocs_;
-  string hostname_;
+  string hostip_;
   std::vector<std::string> endpoints_;
   // cluster config proto
   ClusterProto cluster_;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/include/utils/common.h
----------------------------------------------------------------------
diff --git a/include/utils/common.h b/include/utils/common.h
index aca35ec..d3c23a8 100644
--- a/include/utils/common.h
+++ b/include/utils/common.h
@@ -32,6 +32,7 @@ int LeastCommonMultiple(int a, int b);
 inline float rand_real(){
   return  static_cast<float>(rand())/(RAND_MAX+1.0f);
 }
+const std::string GetHostIP();
 
 class Metric {
  public:

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
index 9ea4509..14b25da 100644
--- a/src/trainer/server.cc
+++ b/src/trainer/server.cc
@@ -70,7 +70,12 @@ void Server::Run(){
         if(syncEntry>=master_params.size())
           continue;
         auto param=master_params.at(syncEntry);
-        if(param->local_version()!=param->version()){
+        // control the frequency of synchronization
+        // currently sync is triggerred only when the slice is updated
+        // by local worker or other workers for at least nserver_groups times.
+        // TODO may optimize the trigger condition.
+        if(abs(param->local_version()-param->version())>=cluster->nserver_groups()){
+          // TODO replace the argument (0,0) to sync a chunk instead of a slice
           sync=param->GenSyncMsg(0,0);
           for(int i=0;i<cluster->nserver_groups();i++){
             if(i!=group_id_) {
@@ -82,7 +87,7 @@ void Server::Run(){
               tmp->set_src(group_id_, server_id_, kServer);
               dealer_->Send(&tmp);
               param->set_version(param->local_version());
-              //DLOG(ERROR)<<"sync";
+              //LOG(ERROR)<<"sync slice="<<param->id()<<" to procs "<<i;
             }
           }
           syncEntry=(syncEntry+1)%master_params.size();
@@ -172,6 +177,7 @@ Msg* Server::HandleSyncRequest(shared_ptr<Param> param, Msg **msg){
   CHECK_EQ((*msg)->frame_size(), param->size()*sizeof(float));
   Tensor<cpu, 1> tmp(static_cast<float*>((*msg)->frame_data()), shape);
   Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
+  //LOG(ERROR)<<"Recv sync for "<<param->id();
   if(slice2group_[param->id()]==group_id_){
     cur+=tmp;
     param->set_local_version(param->local_version()+1);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 4e7f932..58e568e 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -257,8 +257,9 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto&
cproto,
   router_=make_shared<Router>();
   router_->Bind(kInprocRouterEndpoint);
   if(cluster->nprocs()>1){
-    int port=router_->Bind("tcp://127.0.0.1:*");
-    cluster->Register(cluster->hostname()+":"+std::to_string(port));
+    const string hostip=cluster->hostip();
+    int port=router_->Bind("tcp://"+hostip+":*");
+    cluster->Register(hostip+":"+std::to_string(port));
   }else
     cluster->set_procs_id(0);
 
@@ -312,6 +313,9 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
   poll.Add(router_.get());
   int sync_server=0, nworkers=workers.size(), nservers=servers.size();
   while(!stop){
+    // if the poll time is large, then the poller may not expire
+    // if it is small, then many reminder messages will be sent which may
+    // slow done the process of other request. TODO tune it.
     auto *sock=poll.Wait(cluster->poll_time());
     if(poll.Terminated()){
       LOG(ERROR)<<"Connection broken!";
@@ -411,6 +415,8 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers,
         }else{
           dst_procs_id=cluster->ProcsIDOf(msg->dst_first(),
               msg->dst_second(), msg->dst_flag());
+          if(type==kSync)
+            LOG(ERROR)<<msg->dst_first()<<","<<msg->dst_second()<<","<<dst_procs_id;
         }
         if(dst_procs_id!=procs_id_){
           // forward to other procs
@@ -474,6 +480,8 @@ const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo>
pi, Msg** msg){
     for(int idx=0, id=param->slice_start();idx<param->num_slices();idx++){
       int server=slice2server_[id+idx];
       int procs=Cluster::Get()->ProcsIDOf(group, server, kServer);
+      if(procs!=procs_id_)
+        LOG(ERROR)<<"Copy for update";
       auto x=param->GenGetMsg(procs!=procs_id_, idx);
       x->set_trgt(param->owner(), id+idx, param->local_version()+1);
       x->set_src(procs_id_, gid, kStub);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index bf423b4..706d2ef 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -51,9 +51,7 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) {
   rt->Init();
   cluster_rt_=shared_ptr<ClusterRuntime>(static_cast<ClusterRuntime*>(rt));
 
-  char buf[128];
-  gethostname(buf, 128);
-  hostname_=string(buf);
+  hostip_=GetHostIP();
 }
 
 void Cluster::Register(const string& endpoint){

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/src/utils/common.cc
----------------------------------------------------------------------
diff --git a/src/utils/common.cc b/src/utils/common.cc
index ed94856..a3242aa 100644
--- a/src/utils/common.cc
+++ b/src/utils/common.cc
@@ -6,6 +6,16 @@
 #include <google/protobuf/text_format.h>
 #include <google/protobuf/io/zero_copy_stream_impl.h>
 #include <stdarg.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <netinet/in.h>
+#include <net/if.h>
+#include <arpa/inet.h>
+
+
 
 namespace singa {
 
@@ -106,4 +116,26 @@ int LeastCommonMultiple(int a, int b)
 
   return temp ? (a / temp * b) : 0;
 }
+
+const std::string GetHostIP(){
+  int fd;
+  struct ifreq ifr;
+
+  fd = socket(AF_INET, SOCK_DGRAM, 0);
+
+  /* I want to get an IPv4 IP address */
+  ifr.ifr_addr.sa_family = AF_INET;
+
+  /* I want IP address attached to "eth0" */
+  strncpy(ifr.ifr_name, "eth0", IFNAMSIZ-1);
+
+  ioctl(fd, SIOCGIFADDR, &ifr);
+
+  close(fd);
+
+  string ip(inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr));
+  /* display result */
+  LOG(INFO)<<"Host IP=("<<ip;
+  return ip;
+}
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4956d6a0/src/utils/param.cc
----------------------------------------------------------------------
diff --git a/src/utils/param.cc b/src/utils/param.cc
index ac3a6bb..1e05ab9 100644
--- a/src/utils/param.cc
+++ b/src/utils/param.cc
@@ -108,7 +108,7 @@ Msg* Param::GenGetMsg(bool copy, int idx){
   CHECK_LT(idx, num_slices_);
   Msg* msg=new Msg();
   msg->set_type(kGet);
-  char buf[8]; sprintf(buf, " %c ", copy);
+  char buf[8]; sprintf(buf, " %d ", copy);
   msg->add_frame(buf, sizeof(buf));
   pending_get_[idx]=true;
   num_pending_requests_++;
@@ -119,11 +119,13 @@ Msg* Param::GenUpdateMsg(bool copy, int idx){
   CHECK_LT(idx, num_slices_);
   Msg* msg=new Msg();
   msg->set_type(kUpdate);
-  char buf[8]; sprintf(buf, " %c ", copy);
+  char buf[8]; sprintf(buf, " %d ", copy);
   msg->add_frame(buf, sizeof(buf));
   void* ptr=grad_.mutable_cpu_data()+slice_offset_[idx];
-  if(copy)
+  if(copy){
+    LOG(ERROR)<<"Copy in gen update";
     msg->add_frame(ptr, slice_size_[idx]*sizeof(float));
+  }
   else{ // to share values of grad blob
     char buf[32]; sprintf(buf, " %p ", ptr);
     msg->add_frame(buf, strlen(buf));
@@ -166,7 +168,7 @@ Msg* Param::HandlePutMsg(Msg** msg){
 }
 
 Msg* Param::HandleGetMsg(Msg** msg){
-  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %c ", &copy);
+  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
   (*msg)->next_frame();
   if(copy)
     (*msg)->add_frame(mutable_cpu_data(), sizeof(float)*size());
@@ -177,9 +179,10 @@ Msg* Param::HandleGetMsg(Msg** msg){
 }
 
 int Param::ParseUpdateMsg(Msg** msg){
-  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %c ", &copy);
+  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
   (*msg)->next_frame();
   if(copy){
+    LOG(ERROR)<<"Copy in parse update";
     CHECK((*msg)->frame_size());
     memcpy(mutable_cpu_grad(), (*msg)->frame_data(),(*msg)->frame_size());
   }else {// use the same data field of the grad blob
@@ -194,10 +197,12 @@ int Param::ParseUpdateMsg(Msg** msg){
 Msg* Param::GenUpdateResponseMsg(bool copy){
   Msg* msg=new Msg();
   msg->set_type(kRUpdate);
-  char buf[8]; sprintf(buf, " %c ", copy);
+  char buf[8]; sprintf(buf, " %d ", copy);
   msg->add_frame(buf, sizeof(buf));
-  if(copy)
+  if(copy){
+    LOG(ERROR)<<"Copy in gen";
     msg->add_frame(mutable_cpu_data(), size()*sizeof(float));
+  }
   return msg;
 }
 
@@ -226,10 +231,9 @@ int Param::ParseUpdateResponseMsg(Msg **msg, int slice_idx){
 }
 
 void Param::ParseResponseMsg(Msg** msg, int slice_idx){
-  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %c ", &copy);
+  char copy; sscanf(static_cast<char*>((*msg)->frame_data()), " %d ", &copy);
   (*msg)->next_frame();
   if(copy){
-    LOG(ERROR)<<"copy";
     CHECK((*msg)->frame_size());
     memcpy(mutable_cpu_data()+slice_offset_[slice_idx],
         (*msg)->frame_data(), (*msg)->frame_size());


Mime
View raw message