Return-Path: X-Original-To: apmail-singa-commits-archive@minotaur.apache.org Delivered-To: apmail-singa-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D9CEE18245 for ; Thu, 25 Jun 2015 13:45:26 +0000 (UTC) Received: (qmail 86316 invoked by uid 500); 25 Jun 2015 13:45:26 -0000 Delivered-To: apmail-singa-commits-archive@singa.apache.org Received: (qmail 86299 invoked by uid 500); 25 Jun 2015 13:45:26 -0000 Mailing-List: contact commits-help@singa.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@singa.incubator.apache.org Delivered-To: mailing list commits@singa.incubator.apache.org Received: (qmail 86290 invoked by uid 99); 25 Jun 2015 13:45:26 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Jun 2015 13:45:26 +0000 X-ASF-Spam-Status: No, hits=-2001.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 25 Jun 2015 13:43:12 +0000 Received: (qmail 83644 invoked by uid 99); 25 Jun 2015 13:44:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Jun 2015 13:44:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 551DAE3689; Thu, 25 Jun 2015 13:44:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wangsh@apache.org To: commits@singa.incubator.apache.org Date: Thu, 25 Jun 2015 13:45:02 -0000 Message-Id: <74391fae85974542a92183e76821a02d@git.apache.org> In-Reply-To: <097c680d31de4766bc86a6b701711481@git.apache.org> References: <097c680d31de4766bc86a6b701711481@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/6] incubator-singa git commit: SINGA-8 Implement distributed Hogwild X-Virus-Checked: Checked by ClamAV on apache.org SINGA-8 Implement distributed Hogwild The original Param objects are sliced to make the size of parameters mastered by server groups (roughly) equal. Following Caffe's implementation, we let each server group master a subset of param slices. Each server group updates all model parameters for the corresponding worker groups and synchronize with other server groups on their mastered slices. Tested on single node with multiple processes, each of which has one server group with one server and one worker group with one worker. The training loss decreases not as fast as shared-memory hogwild. TODO optimize and test on multiple nodes. Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/884b9d70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/884b9d70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/884b9d70 Branch: refs/heads/master Commit: 884b9d70a631bee4961fb3907e47a747c5dd2b89 Parents: ad13d03 Author: wang wei Authored: Thu Jun 25 11:39:45 2015 +0800 Committer: wang wei Committed: Thu Jun 25 11:50:28 2015 +0800 ---------------------------------------------------------------------- examples/cifar10/model.conf | 1 + include/communication/socket.h | 2 +- include/trainer/trainer.h | 3 +- include/utils/cluster.h | 10 ++--- include/utils/param.h | 5 +-- src/communication/msg.cc | 4 +- src/communication/socket.cc | 2 +- src/proto/cluster.proto | 1 - src/proto/common.proto | 1 + src/trainer/server.cc | 79 ++++++++++++++++++------------------- src/trainer/trainer.cc | 37 +++++++++-------- src/utils/cluster.cc | 15 +++---- src/utils/param.cc | 14 +++---- 13 files changed, 83 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/examples/cifar10/model.conf ---------------------------------------------------------------------- diff --git a/examples/cifar10/model.conf b/examples/cifar10/model.conf index 42be6dd..2bf76b0 100644 --- a/examples/cifar10/model.conf +++ b/examples/cifar10/model.conf @@ -25,6 +25,7 @@ layer{ sharddata_conf { path: "examples/cifar10/cifar10_train_shard" batchsize: 16 + random_skip: 5000 } exclude: kTest } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/include/communication/socket.h ---------------------------------------------------------------------- diff --git a/include/communication/socket.h b/include/communication/socket.h index b98656e..5a9598c 100644 --- a/include/communication/socket.h +++ b/include/communication/socket.h @@ -59,7 +59,7 @@ class Poller { /** * @return true if the poller is terminated due to process interupt */ - virtual bool Terminated()=0; + virtual bool Terminated(); protected: #ifdef USE_ZMQ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/include/trainer/trainer.h ---------------------------------------------------------------------- diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h index fb716bc..31d3704 100644 --- a/include/trainer/trainer.h +++ b/include/trainer/trainer.h @@ -101,8 +101,7 @@ class Trainer{ const ModelProto& mproto, vector *slice_size); void Run(const vector>& workers, - const vector>& servers, - const std::map>& shards); + const vector>& servers); /** * Register default implementations for all base classes used in the system, * e.g., the Updater, BaseMsg, etc. http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/include/utils/cluster.h ---------------------------------------------------------------------- diff --git a/include/utils/cluster.h b/include/utils/cluster.h index 3830383..55b10a9 100644 --- a/include/utils/cluster.h +++ b/include/utils/cluster.h @@ -32,7 +32,10 @@ class Cluster { int nworkers_per_procs()const{return cluster_.nworkers_per_procs();} int nservers_per_procs()const{return cluster_.nservers_per_procs();} int nworker_groups_per_server_group() const { - return cluster_.nworker_groups()/cluster_.nserver_groups(); + if(nserver_groups()==0||nservers_per_group()==0) + return 1; + else + return cluster_.nworker_groups()/cluster_.nserver_groups(); } /** @@ -49,10 +52,7 @@ class Cluster { * @return true if the calling procs has worker threads. */ bool has_worker()const { - if(server_worker_separate()){ - return procs_id_& shape); - virtual void Setup(const vector& shape); /* * Fill the values according to initmethod, e.g., gaussian distribution * http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/communication/msg.cc ---------------------------------------------------------------------- diff --git a/src/communication/msg.cc b/src/communication/msg.cc index 7ee8cad..38512d2 100644 --- a/src/communication/msg.cc +++ b/src/communication/msg.cc @@ -11,8 +11,8 @@ Msg::Msg(const Msg& msg){ src_=msg.src_; dst_=msg.dst_; type_=msg.type_; - target_first_=msg.target_first_; - target_second_=msg.target_second_; + trgt_first_=msg.trgt_first_; + trgt_second_=msg.trgt_second_; msg_=zmsg_dup(msg.msg_); } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/communication/socket.cc ---------------------------------------------------------------------- diff --git a/src/communication/socket.cc b/src/communication/socket.cc index c6925d8..0cb0982 100644 --- a/src/communication/socket.cc +++ b/src/communication/socket.cc @@ -90,7 +90,7 @@ Router::~Router() { zmsg_destroy(&msg); } } -int Router::Bind(std::string endpoint){ +int Router::Bind(const std::string& endpoint){ int port=-1; if(endpoint.length()){ port=zsock_bind(router_, "%s", endpoint.c_str()); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/proto/cluster.proto ---------------------------------------------------------------------- diff --git a/src/proto/cluster.proto b/src/proto/cluster.proto index 3317f2a..8fbdbbe 100644 --- a/src/proto/cluster.proto +++ b/src/proto/cluster.proto @@ -43,7 +43,6 @@ message ClusterProto { optional int32 bandwidth=50 [default=134217728]; // poll time in milliseconds optional int32 poll_time=51 [default =100]; ->>>>>>> SINGA-8 Implement distributed Hogwild } message ServerTopology { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/proto/common.proto ---------------------------------------------------------------------- diff --git a/src/proto/common.proto b/src/proto/common.proto index 6bc0919..70b743c 100644 --- a/src/proto/common.proto +++ b/src/proto/common.proto @@ -13,6 +13,7 @@ enum MsgType { kRUpdate = 9; kConnect = 10; kMetric = 11; + kSyncReminder = 12; }; enum EntityType { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/trainer/server.cc ---------------------------------------------------------------------- diff --git a/src/trainer/server.cc b/src/trainer/server.cc index 9e0dee3..9ea4509 100644 --- a/src/trainer/server.cc +++ b/src/trainer/server.cc @@ -7,6 +7,7 @@ #include "utils/singleton.h" #include "utils/factory.h" #include "utils/cluster.h" +#include "proto/common.pb.h" using namespace mshadow; namespace singa { @@ -34,8 +35,9 @@ void Server::Run(){ ping->add_frame("PING", 4); ping->set_type(kConnect); dealer_->Send(&ping); - int syncEntry=0; - //start recv loop and process requests + vector> master_params; + size_t syncEntry=0; + //start recv loop and process requests while (true){ Msg* msg=dealer_->Receive(); if (msg==nullptr) @@ -53,46 +55,39 @@ void Server::Run(){ CHECK_STREQ("PONG", pong.c_str()); DeleteMsg(&msg); }else if(type==kPut){ + int pid = msg->trgt_second(); response = HandlePut(&msg); + if(slice2group_[pid]==group_id_) + master_params.push_back(shard_->at(pid)); }else{ int pid=msg->trgt_second(); if(shard_->find(pid)==shard_->end()){ // delay the processing by re-queue the msg. response=msg; DLOG(ERROR)<<"Requeue msg"; - }else if(type==kSyncReminder){ - DeleteMsg(&msg); - unsigned nchecks=0, nparams=shard_->size(); - while(nchecksat(shard_->at(syncEntry))!=group_id_){ - syncEntry=(syncEntry+1)%nparams; - nchecks++; - } - if(nchecks==nparams) continue; - auto param=shard_->at(syncEntry); - if(param->local_version()!=param->version()){ - sync=param->GenSyncMsg(true); - for(int i=0;inserver_groups();i++){ - if(i!=group_id_) { - Msg* tmp=sync; - if(inserver_groups()-1) - tmp= new Msg(*sync); - tmp->set_dst(i, server_locator_->at(param), kServer); - tmp->set_src(group_id_, server_id_, kServer); - dealer_->Send(&tmp); - param->set_version(param->local_version()); - //DLOG(ERROR)<<"sync"; + }else if(type == kSyncReminder){ + DeleteMsg(&msg); + if(syncEntry>=master_params.size()) + continue; + auto param=master_params.at(syncEntry); + if(param->local_version()!=param->version()){ + sync=param->GenSyncMsg(0,0); + for(int i=0;inserver_groups();i++){ + if(i!=group_id_) { + Msg* tmp=sync; + if(inserver_groups()-1) + tmp= new Msg(*sync); + // assume only one server per group, TODO generalize it + tmp->set_dst(i, 0, kServer); + tmp->set_src(group_id_, server_id_, kServer); + dealer_->Send(&tmp); + param->set_version(param->local_version()); + //DLOG(ERROR)<<"sync"; + } } + syncEntry=(syncEntry+1)%master_params.size(); } - } - }else { - int pid=msg->target_first(); - if(shard_->find(pid)==shard_->end()){ - // delay the processing by re-queue the msg. - response=msg; - LOG(ERROR)<<"Requeue"; ->>>>>>> SINGA-8 Implement distributed Hogwild - } else{ + }else{ auto param=shard_->at(pid); switch (type){ case kGet: @@ -118,7 +113,7 @@ void Server::Run(){ Msg* Server::HandlePut(Msg **msg){ int version=(*msg)->trgt_third(); - int pid=(*msg)->target_first(); + int pid=(*msg)->trgt_second(); shared_ptr param=nullptr; if(shard_->find(pid)!=shard_->end()){ LOG(ERROR)<<"Param ("<>::Instance(); param=shared_ptr(factory ->Create("Param")); - param->set_id(pid); (*shard_)[pid]=param; } auto response=param->HandlePutMsg(msg); // must set version after HandlePutMsg which allocates the memory param->set_version(version); + param->set_local_version(version); + param->set_id(pid); if(Cluster::Get()->nserver_groups()>1 && - group_locator_->at(param)!=group_id_){ + slice2group_[pid]!=group_id_){ last_data_[pid]=std::make_shared>(); last_data_[pid]->ReshapeLike(param->data()); last_data_[pid]->CopyFrom(param->data()); } - LOG(INFO)<<"Server put param "<Register(cluster->hostname()+":"+std::to_string(port)); + }else + cluster->set_procs_id(0); + procs_id_ = cluster->procs_id(); int nthreads=1; // create workers vector slices; @@ -280,7 +285,7 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto, threads.push_back(std::thread(&Server::Run,server.get())); for(auto worker: workers) threads.push_back(std::thread(&Worker::Run,worker.get())); - Run(workers, servers, shards); + Run(workers, servers); for(auto& thread: threads) thread.join(); for(auto x: ctx) @@ -292,9 +297,9 @@ inline int bandwidth(int bytes, system_clock::time_point start){ auto duration=duration_cast (now - start); return static_cast(bytes*1000.f/duration.count()); } + void Trainer::Run(const vector>& workers, - const vector>& servers, - const std::map>& shards){ + const vector>& servers){ auto cluster=Cluster::Get(); procs_id_=cluster->procs_id(); LOG(INFO)<<"Stub in process "<>& workers, poll.Add(router_.get()); int sync_server=0, nworkers=workers.size(), nservers=servers.size(); while(!stop){ - Socket *sock=poll.Wait(cluster->poll_time()); + auto *sock=poll.Wait(cluster->poll_time()); if(poll.Terminated()){ LOG(ERROR)<<"Connection broken!"; exit(0); @@ -321,7 +326,6 @@ void Trainer::Run(const vector>& workers, msg->set_type(kSyncReminder); sync_server=(sync_server+1)%servers.size(); router_->Send(&msg); - //LOG(ERROR)<<"Reminder"; } continue; } @@ -345,14 +349,13 @@ void Trainer::Run(const vector>& workers, nservers--; else if (msg->src_flag()==kWorkerParam) nworkers--; - delete msg; - msg=nullptr; + DeleteMsg(&msg); if(nworkers==0&&nservers==0){ stop=true; break; } }else if(type==kMetric){ - if(msg->src_first()==0){ + if(msg->src_first()>=0){ int step=msg->trgt_first(); string prefix((char*)msg->frame_data(), msg->frame_size()); msg->next_frame(); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/utils/cluster.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc index 023abbe..bf423b4 100644 --- a/src/utils/cluster.cc +++ b/src/utils/cluster.cc @@ -14,21 +14,16 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) { procs_id_=procs_id; cluster_ = cluster; SetupFolders(cluster); - int nprocs; if(server_worker_separate()) nprocs_=nworker_procs()+nserver_procs(); else - nprocs=std::max(nworker_procs(), nserver_procs()); - CHECK_LT(procs_id, nprocs); - if (cluster_.has_nprocs()) - CHECK_EQ(cluster.nprocs(), nprocs); - else - cluster_.set_nprocs(nprocs); - if(nprocs>1&&procs_id>-1){ + nprocs_=std::max(nworker_procs(), nserver_procs()); + CHECK_LT(procs_id, nprocs_); + if(nprocs_>1&&procs_id>-1){ std::ifstream ifs(cluster.hostfile(), std::ifstream::in); std::string line; - while(std::getline(ifs, line) - &&endpoints_.size()(nprocs_)){ + while(std::getline(ifs, line)&& + endpoints_.size()< static_cast(nprocs_)){ endpoints_.push_back(line); } CHECK_EQ(endpoints_.size(), nprocs_); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/884b9d70/src/utils/param.cc ---------------------------------------------------------------------- diff --git a/src/utils/param.cc b/src/utils/param.cc index 4ad17ce..ac3a6bb 100644 --- a/src/utils/param.cc +++ b/src/utils/param.cc @@ -133,11 +133,11 @@ Msg* Param::GenUpdateMsg(bool copy, int idx){ return msg; } -Msg* Param::GenSyncMsg(bool copy, int v){ +Msg* Param::GenSyncMsg(int offset, int size){ Msg* msg=new Msg(); msg->set_type(kSyncRequest); - msg->set_target(id(), local_version()); - msg->add_frame(mutable_cpu_data(), size()*sizeof(float)); + msg->set_trgt(-1, id(), local_version()); + msg->add_frame(mutable_cpu_data(), data_->count()*sizeof(float)); return msg; } @@ -150,9 +150,10 @@ Msg* Param::HandlePutMsg(Msg** msg){ proto_.set_learning_rate_multiplier(lr); proto_.set_weight_decay_multiplier(wc); vector shape{size}; - Setup(shape); - set_local_version((*msg)->target_second()); - set_version((*msg)->target_second()); + ParamProto proto; + Setup(proto, shape); + set_local_version((*msg)->trgt_third()); + set_version((*msg)->trgt_third()); if(ptr==nullptr){ CHECK((*msg)->next_frame()); CHECK_EQ(size* sizeof(float), (*msg)->frame_size()); @@ -205,7 +206,6 @@ Msg* Param::HandleSyncMsg(Msg** msg){ return nullptr; } -<<<<<<< HEAD int Param::ParseSyncResponseMsg(Msg** msg, int slice_idx){ DeleteMsg(msg); return 1;