singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wang...@apache.org
Subject [5/5] incubator-singa git commit: merge code for data partition within one group on single node to the upstream
Date Sun, 17 May 2015 06:20:03 GMT
merge code for data partition within one group on single node to the upstream


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/f29d93ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/f29d93ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/f29d93ff

Branch: refs/heads/master
Commit: f29d93ff7967b615d546fc525c3f514ce350f689
Parents: 0d47ec5 06f85e2
Author: wang wei <wangwei@comp.nus.edu.sg>
Authored: Sun May 17 14:19:05 2015 +0800
Committer: wang wei <wangwei@comp.nus.edu.sg>
Committed: Sun May 17 14:19:05 2015 +0800

----------------------------------------------------------------------
 README.md                     | 12 ++++++++++++
 examples/cifar10/cluster.conf |  4 ++--
 include/trainer/worker.h      |  2 +-
 src/trainer/trainer.cc        |  2 +-
 src/trainer/worker.cc         |  1 +
 5 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f29d93ff/examples/cifar10/cluster.conf
----------------------------------------------------------------------
diff --cc examples/cifar10/cluster.conf
index 88c3d4b,6b8a8e6..97c64fd
--- a/examples/cifar10/cluster.conf
+++ b/examples/cifar10/cluster.conf
@@@ -1,6 -1,5 +1,6 @@@
  nworker_groups: 1
  nserver_groups: 1
  nservers_per_group: 1
- nworkers_per_group: 2
- nworkers_per_procs: 2
+ nworkers_per_group: 1
++nworkers_per_procs: 1
  workspace: "examples/cifar10/"

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f29d93ff/include/trainer/worker.h
----------------------------------------------------------------------
diff --cc include/trainer/worker.h
index afa56ae,09ef49d..ec07210
--- a/include/trainer/worker.h
+++ b/include/trainer/worker.h
@@@ -137,9 -159,10 +137,9 @@@ class Worker 
    void ReceiveBlobs(shared_ptr<NeuralNet> net);
    void SendBlob();
   protected:
-   int thread_id_,group_id_, worker_id_;
+   int thread_id_, group_id_, worker_id_;
    int step_;
    ModelProto modelproto_;
 -  shared_ptr<PMWorker> pmworker_;
    shared_ptr<NeuralNet> train_net_, test_net_, validation_net_;
    shared_ptr<Dealer> layer_dealer_, param_dealer_;
    Poller layer_poller_, param_poller_;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f29d93ff/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --cc src/trainer/trainer.cc
index 35b8f6c,89e97f1..bc6867d
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@@ -53,9 -58,9 +53,9 @@@ void Trainer::Start(const ModelProto& m
      int start=pid*cluster->nservers_per_procs()%cluster->nservers_per_group();
      int end=start+cluster->nservers_per_group();
      // the ParamShard for servers consists of a dictionary of Param objects
 -    auto shard=make_shared<PMServer::ParamShard>();
 +    auto shard=make_shared<Server::ParamShard>();
      for(int sid=start;sid<end;sid++){
-       auto server=make_shared<Server>(nthreads++,gid, sid);
+       auto server=make_shared<Server>(nthreads++, gid, sid);
        server->Setup(mproto.updater(), shard);
        servers.push_back(server);
      }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f29d93ff/src/trainer/worker.cc
----------------------------------------------------------------------
diff --cc src/trainer/worker.cc
index 7565d49,138c954..dfbe989
--- a/src/trainer/worker.cc
+++ b/src/trainer/worker.cc
@@@ -22,67 -20,27 +22,68 @@@ void Worker::Setup(const ModelProto& mo
  }
  
  void Worker::Run(){
 -  param_dealer_=std::make_shared<Dealer>(thread_id_*2+1);
 +  param_dealer_=make_shared<Dealer>(2*thread_id_);
    param_dealer_->Connect(kInprocRouterEndpoint);
 +  param_poller_.Add(param_dealer_.get());
 +  layer_dealer_=make_shared<Dealer>(2*thread_id_+1);
 +  layer_dealer_->Connect(kInprocRouterEndpoint);
 +
 +  { // TODO remove waiting pong msg
 +  Msg* ping=new Msg();
 +  ping->set_src(group_id_, worker_id_, kWorkerParam);
 +  ping->set_dst(-1,-1,kStub);
 +  ping->set_type(kConnect);
 +  ping->add_frame("PING", 4);
 +  param_dealer_->Send(&ping);
 +  ping=param_dealer_->Receive();
 +  string pong((char*)ping->frame_data(), ping->frame_size());
 +  CHECK_STREQ("PONG", pong.c_str());
 +  delete ping;
 +  }
 +
 +  {
 +  Msg* ping=new Msg();
 +  ping->set_src(group_id_, worker_id_, kWorkerLayer);
 +  ping->set_dst(-1,-1,kStub);
 +  ping->set_type(kConnect);
 +  ping->add_frame("PING", 4);
 +  layer_dealer_->Send(&ping);
 +  ping=layer_dealer_->Receive();
 +  string pong((char*)ping->frame_data(), ping->frame_size());
 +  CHECK_STREQ("PONG", pong.c_str());
 +  delete ping;
 +  }
 +  step_=modelproto_.step();
+   //layer_dealer_=std::make_shared<Dealer>(thread_id_*2);
    // init params
 -  for(auto layer: train_net_->layers())
 -    if(group_id_==0&&layer->locationid()==worker_id_)
 +  for(auto layer: train_net_->layers()){
 +    //LOG(ERROR)<<layer->partitionid()<<" : "<<layer->name();
 +    if(layer->partitionid()==worker_id_)
        for(auto param: layer->GetParams()){
 -        if(param->owner()<0||param->owner()==param->id()){
 -          param->Init();
 -          Put(param, step_);
 +        if(group_id_==0){
 +          if(param->owner()==param->id()){
 +            param->Init(0);
 +            Put(param, step_);
 +          }else{
 +            Get(param, 0);
 +          }
 +        }else{
 +          Get(param, modelproto_.warmup_steps());
          }
 -        else
 -          Get(param, step_);
        }
 -
 -  step_=modelproto_.step();
 -  Performance perf(train_net_);
 +  }
 +  Metric perf;
 +  if(group_id_==0&&step_<modelproto_.warmup_steps()){
 +    for(step_=0;step_<modelproto_.warmup_steps();step_++)
 +      RunOneBatch(step_, &perf);
 +    for(auto layer: train_net_->layers()){
 +      //LOG(ERROR)<<layer->partitionid()<<" : "<<layer->name();
 +      if(layer->partitionid()==worker_id_)
 +        for(auto param: layer->GetParams())
 +          if(param->owner()==param->id())
 +            Put(param, step_);
 +    }
 +  }
    while(!StopNow(step_)){
      RunOneBatch(step_, &perf);
      step_++;


Mime
View raw message