singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [07/13] incubator-singa git commit: SINGA-70 Refactor API of Layer, Worker, Server and Driver
Date Sun, 27 Sep 2015 14:34:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/connection_layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/connection_layer.cc b/src/neuralnet/connection_layer.cc
index 1ba2d95..acf243d 100644
--- a/src/neuralnet/connection_layer.cc
+++ b/src/neuralnet/connection_layer.cc
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,17 +24,24 @@
 namespace singa {
 
 using std::vector;
-
+/********* Implementation for BridgeDstLayer **************/
+void BridgeDstLayer::Setup(const LayerProto& proto,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(proto, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  data_.Reshape(srclayers[0]->data(this).shape());
+  grad_.ReshapeLike(data_);
+}
 /************* Implementation for ConcateLayer ***********/
-void ConcateLayer::Setup(const LayerProto& proto, int npartitions) {
-  // CHECK_EQ(npartitions, 1);
-  Layer::Setup(proto, npartitions);
-  size_t concate_dim = proto.concate_conf().concate_dim();
+void ConcateLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(conf, srclayers);
+  size_t concate_dim = conf.concate_conf().concate_dim();
   CHECK_GE(concate_dim, 0);
-  CHECK_GT(srclayers_.size(), 1);
-  vector<int> shape = srclayers_[0]->data(this).shape();
-  for (size_t i = 1; i < srclayers_.size(); i++) {
-    const vector<int>& srcshape = srclayers_[i]->data(this).shape();
+  CHECK_GT(srclayers.size(), 1);
+  vector<int> shape = srclayers[0]->data(this).shape();
+  for (size_t i = 1; i < srclayers.size(); i++) {
+    const vector<int>& srcshape = srclayers[i]->data(this).shape();
     for (size_t j = 0; j < shape.size(); j++)
       if (j == concate_dim)
         shape[j] += srcshape[j];
@@ -45,23 +52,24 @@ void ConcateLayer::Setup(const LayerProto& proto, int npartitions) {
   grad_.Reshape(shape);
 }
 
-void ConcateLayer::ComputeFeature(int flag, Metric *perf) {
+void ConcateLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   LOG(FATAL) << "Not implemented for Concate Layer";
 }
 
-void ConcateLayer::ComputeGradient(int flag, Metric* perf) {
+void ConcateLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   LOG(FATAL) << "Not implemented for Concate Layer";
 }
 
 /************* Implementation for SliceLayer****************/
-void SliceLayer::Setup(const LayerProto& proto, int npartitions) {
+void SliceLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
   /*
-  Layer::Setup(proto, npartitions);
-  slice_dim_ = proto.slice_conf().slice_dim();
+  Layer::Setup(conf, npartitions);
+  slice_dim_ = conf.slice_conf().slice_dim();
   slice_num_ = npartitions;
   CHECK_GE(slice_dim_, 0);
   CHECK_EQ(slice_num_, dstlayers_.size());
-  data_.Reshape(srclayers_[0]->data(this).shape());
+  data_.Reshape(srclayers[0]->data(this).shape());
   grad_.ReshapeLike(data_);
   datavec_.resize(slice_num_);
   gradvec_.resize(slice_num_);
@@ -79,11 +87,11 @@ void SliceLayer::Setup(const LayerProto& proto, int npartitions) {
   LOG(FATAL) << "Not implemented";
 }
 
-void SliceLayer::ComputeFeature(int flag, Metric *perf) {
+void SliceLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   /*
-  CHECK_EQ(srclayers_.size(), 1);
+  CHECK_EQ(srclayers.size(), 1);
   if (slice_dim_ == 0) {
-    const auto& blob = srclayers_.at(0)->data(this);
+    const auto& blob = srclayers.at(0)->data(this);
     int size = blob.count() / slice_num_;
     for (int i = 0; i < slice_num_; i++) {
       float* dst = datavec_[i].mutable_cpu_data();
@@ -95,7 +103,7 @@ void SliceLayer::ComputeFeature(int flag, Metric *perf) {
   LOG(FATAL) << "Not implemented";
 }
 
-void SliceLayer::ComputeGradient(int flag, Metric* perf) {
+void SliceLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   LOG(FATAL) << "Not implemented";
 }
 
@@ -112,19 +120,19 @@ int SliceLayer::SliceID(const Layer* layer) const {
 }*/
 
 /************* Implementation for SplitLayer****************/
-void SplitLayer::Setup(const LayerProto& proto, int npartitions) {
-  // CHECK_EQ(npartitions, 1);
-  Layer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  data_.Reshape(srclayers_[0]->data(this).shape());
-  grad_.Reshape(srclayers_[0]->data(this).shape());
+void SplitLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(conf, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  data_.Reshape(srclayers[0]->data(this).shape());
+  grad_.Reshape(srclayers[0]->data(this).shape());
 }
 
-void SplitLayer::ComputeFeature(int flag, Metric *perf) {
+void SplitLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   LOG(FATAL) << "Not implemented";
 }
 
-void SplitLayer::ComputeGradient(int flag, Metric* perf) {
+void SplitLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   LOG(FATAL) << "Not implemented";
 }
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/input_layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/input_layer.cc b/src/neuralnet/input_layer.cc
index a608ba4..f89369c 100644
--- a/src/neuralnet/input_layer.cc
+++ b/src/neuralnet/input_layer.cc
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,9 +34,9 @@ using std::string;
 using std::vector;
 
 /************* Implementation for ParserLayer ***********/
-void ParserLayer::ComputeFeature(int flag, Metric *perf) {
-  CHECK_EQ(srclayers_.size(), 1);
-  auto datalayer = dynamic_cast<DataLayer*>(*srclayers_.begin());
+void ParserLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
+  CHECK_EQ(srclayers.size(), 1);
+  auto datalayer = dynamic_cast<DataLayer*>(*srclayers.begin());
   ParseRecords(flag, datalayer->records(), &data_);
 }
 
@@ -48,8 +48,9 @@ LMDBDataLayer::~LMDBDataLayer() {
   mdb_cursor_ = nullptr;
 }
 
-void LMDBDataLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
+void LMDBDataLayer::Setup(const LayerProto& proto,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(proto, srclayers);
   OpenLMDB(proto.lmdbdata_conf().path());
   CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_NEXT),
            MDB_SUCCESS);
@@ -62,7 +63,7 @@ void LMDBDataLayer::Setup(const LayerProto& proto, int npartitions) {
   ConvertCaffeDatumToRecord(datum, record);
   batchsize_ = proto.lmdbdata_conf().batchsize();
   if (partition_dim() == 0)
-    batchsize_ /= npartitions;
+    batchsize_ /= proto.num_partitions();
   records_.resize(batchsize_);
   random_skip_ = proto.lmdbdata_conf().random_skip();
 }
@@ -83,9 +84,9 @@ void LMDBDataLayer::OpenLMDB(const std::string& path) {
            MDB_SUCCESS) << "mdb_cursor_get failed";
 }
 
-void LMDBDataLayer::ComputeFeature(int flag, Metric* perf) {
+void LMDBDataLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   if (mdb_cursor_ == nullptr)
-    OpenLMDB(layer_proto_.lmdbdata_conf().path());
+    OpenLMDB(layer_conf_.lmdbdata_conf().path());
   if (random_skip_) {
     int nskip = rand() % random_skip_;
     int n = 0;
@@ -155,8 +156,9 @@ ShardDataLayer::~ShardDataLayer() {
   shard_ = nullptr;
 }
 
-void ShardDataLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
+void ShardDataLayer::Setup(const LayerProto& proto,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(proto, srclayers);
   shard_ = new DataShard(proto.sharddata_conf().path(), DataShard::kRead);
   string key;
   shard_->Next(&key, &sample_);
@@ -164,14 +166,14 @@ void ShardDataLayer::Setup(const LayerProto& proto, int npartitions) {
   shard_ = nullptr;
   batchsize_ = proto.sharddata_conf().batchsize();
   if (partition_dim() == 0)
-    batchsize_ /= npartitions;
+    batchsize_ /= proto.num_partitions();
   records_.resize(batchsize_);
   random_skip_ = proto.sharddata_conf().random_skip();
 }
 
-void ShardDataLayer::ComputeFeature(int flag, Metric* perf) {
+void ShardDataLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   if (shard_ == nullptr)
-    shard_ = new DataShard(layer_proto_.sharddata_conf().path(),
+    shard_ = new DataShard(layer_conf_.sharddata_conf().path(),
                            DataShard::kRead);
   if (random_skip_) {
     int nskip = rand() % random_skip_;
@@ -193,15 +195,16 @@ void ShardDataLayer::ComputeFeature(int flag, Metric* perf) {
 }
 
 /********* Implementation for LabelLayer **************/
-void LabelLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  int batchsize = dynamic_cast<DataLayer*>(srclayers_[0])->batchsize();
+void LabelLayer::Setup(const LayerProto& proto,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(proto, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  int batchsize = dynamic_cast<DataLayer*>(srclayers[0])->batchsize();
   data_.Reshape(vector<int>{batchsize});
 }
 
 void LabelLayer::ParseRecords(int flag, const vector<Record>& records,
-                              Blob<float>* blob) {
+    Blob<float>* blob) {
   int rid = 0;
   float *label = blob->mutable_cpu_data();
   for (const Record& record : records) {
@@ -212,8 +215,8 @@ void LabelLayer::ParseRecords(int flag, const vector<Record>& records,
 }
 
 /**************** Implementation for MnistLayer ******************/
-void MnistLayer::ParseRecords(int flag,
-    const vector<Record>& records, Blob<float>* blob) {
+void MnistLayer::ParseRecords(int flag, const vector<Record>& records,
+    Blob<float>* blob) {
   LOG_IF(ERROR, records.size() == 0) << "Empty records to parse";
   int ndim = records.at(0).image().shape_size();
   int inputsize = records.at(0).image().shape(ndim-1);
@@ -246,11 +249,12 @@ void MnistLayer::ParseRecords(int flag,
   CHECK_EQ(dptr, blob->mutable_cpu_data() + blob->count());
 }
 
-void MnistLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  int batchsize = dynamic_cast<DataLayer*>(srclayers_[0])->batchsize();
-  Record sample = dynamic_cast<DataLayer*>(srclayers_[0])->sample();
+void MnistLayer::Setup(const LayerProto& proto,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(proto, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  int batchsize = dynamic_cast<DataLayer*>(srclayers[0])->batchsize();
+  Record sample = dynamic_cast<DataLayer*>(srclayers[0])->sample();
   norm_a_ = proto.mnist_conf().norm_a();
   norm_b_ = proto.mnist_conf().norm_b();
   int ndim = sample.image().shape_size();
@@ -261,8 +265,8 @@ void MnistLayer::Setup(const LayerProto& proto, int npartitions) {
 }
 
 /*************** Implementation for RGBImageLayer *************************/
-void RGBImageLayer::ParseRecords(int flag,
-    const vector<Record>& records, Blob<float>* blob) {
+void RGBImageLayer::ParseRecords(int flag, const vector<Record>& records,
+    Blob<float>* blob) {
   const vector<int>& s = blob->shape();
   Tensor<cpu, 4> images(data_.mutable_cpu_data(),
       Shape4(s[0], s[1], s[2], s[3]));
@@ -315,14 +319,15 @@ void RGBImageLayer::ParseRecords(int flag,
     FreeSpace(croped_image);
 }
 
-void RGBImageLayer::Setup(const LayerProto& proto, int npartitions) {
-  ParserLayer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
+void RGBImageLayer::Setup(const LayerProto& proto,
+    const vector<Layer*>& srclayers) {
+  ParserLayer::Setup(proto, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
   scale_ = proto.rgbimage_conf().scale();
   cropsize_ = proto.rgbimage_conf().cropsize();
   mirror_ = proto.rgbimage_conf().mirror();
-  int batchsize = dynamic_cast<DataLayer*>(srclayers_[0])->batchsize();
-  Record sample = dynamic_cast<DataLayer*>(srclayers_[0])->sample();
+  int batchsize = dynamic_cast<DataLayer*>(srclayers[0])->batchsize();
+  Record sample = dynamic_cast<DataLayer*>(srclayers[0])->sample();
   vector<int> shape;
   shape.push_back(batchsize);
   for (int x : sample.image().shape()) {
@@ -361,7 +366,7 @@ PrefetchLayer::~PrefetchLayer() {
 }
 
 
-void PrefetchLayer::ComputeFeature(int flag, Metric* perf) {
+void PrefetchLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   LOG(FATAL) << "Not implemented";
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/layer.cc b/src/neuralnet/layer.cc
index d818533..e229045 100644
--- a/src/neuralnet/layer.cc
+++ b/src/neuralnet/layer.cc
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,7 +42,9 @@ Layer* Layer::Create(const LayerProto& proto) {
   return layer;
 }
 
-const string Layer::DebugString(int step, int flag) {
+const std::string Layer::ToString(bool debug, int flag) {
+  if (!debug)
+    return "";
   string ret = StringPrintf("Layer %10s ", name().c_str());
   if ((flag & kForward) == kForward && data_.count() !=0) {
     ret += StringPrintf("data norm1 %13.9f", data_.asum_data());
@@ -60,4 +62,15 @@ const string Layer::DebugString(int step, int flag) {
   }
   return ret;
 }
+
+const std::string LossLayer::ToString(bool debug, int flag) {
+  std::string disp;
+  if (debug) {
+    disp = Layer::ToString(debug, flag);
+  } else {
+    disp = metric_.ToLogString();
+    metric_.Reset();
+  }
+  return disp;
+}
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/loss_layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/loss_layer.cc b/src/neuralnet/loss_layer.cc
index d8fd92b..b5447f6 100644
--- a/src/neuralnet/loss_layer.cc
+++ b/src/neuralnet/loss_layer.cc
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,50 +40,59 @@ using std::string;
 using std::vector;
 
 /********** * Implementation for EuclideanLossLayer*************************/
-void EuclideanLossLayer::ComputeFeature(int flag, Metric* perf) {
-  int count = srclayers_[0]->data(this).count();
-  CHECK_EQ(count, srclayers_[1]->data(this).count());
-  const float* reconstruct_dptr = srclayers_[0]->data(this).cpu_data();
-  const float* input_dptr = srclayers_[1]->data(this).cpu_data();
+void EuclideanLossLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  CHECK_EQ(srclayers.size(), 2);
+  Layer::Setup(conf, srclayers);
+}
+
+void EuclideanLossLayer::ComputeFeature(int flag,
+    const vector<Layer*>& srclayers) {
+  int count = srclayers[0]->data(this).count();
+  CHECK_EQ(count, srclayers[1]->data(this).count());
+  const float* reconstruct_dptr = srclayers[0]->data(this).cpu_data();
+  const float* input_dptr = srclayers[1]->data(this).cpu_data();
   float loss = 0;
   for (int i = 0; i < count; i++) {
       loss += (input_dptr[i] - reconstruct_dptr[i]) *
         (input_dptr[i] - reconstruct_dptr[i]);
   }
-  perf->Add("loss", loss / srclayers_[0]->data(this).shape()[0]);
+  metric_.Add("loss", loss / srclayers[0]->data(this).shape()[0]);
 }
-void EuclideanLossLayer::ComputeGradient(int flag, Metric* perf) {
-  int count = srclayers_[0]->data(this).count();
-  CHECK_EQ(count, srclayers_[1]->data(this).count());
-  const float* reconstruct_dptr = srclayers_[0]->data(this).cpu_data();
-  const float* input_dptr = srclayers_[1]->data(this).cpu_data();
-  Blob<float>* gsrcblob = srclayers_[0]->mutable_grad(this);
+
+void EuclideanLossLayer::ComputeGradient(int flag,
+    const vector<Layer*>& srclayers) {
+  int count = srclayers[0]->data(this).count();
+  CHECK_EQ(count, srclayers[1]->data(this).count());
+  const float* reconstruct_dptr = srclayers[0]->data(this).cpu_data();
+  const float* input_dptr = srclayers[1]->data(this).cpu_data();
+  Blob<float>* gsrcblob = srclayers[0]->mutable_grad(this);
   float* gsrcptr = gsrcblob->mutable_cpu_data();
   for (int i = 0; i < count; i++) {
     gsrcptr[i] = reconstruct_dptr[i]-input_dptr[i];
   }
   Tensor<cpu, 1> gsrc(gsrcptr, Shape1(gsrcblob->count()));
-  gsrc /= srclayers_[0]->data(this).shape()[0];
+  gsrc /= srclayers[0]->data(this).shape()[0];
 }
 
-
 /********** * Implementation for SoftmaxLossLayer*************************/
-void SoftmaxLossLayer::Setup(const LayerProto& proto, int npartitions) {
-  LossLayer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 2);
-  data_.Reshape(srclayers_[0]->data(this).shape());
+void SoftmaxLossLayer::Setup(const LayerProto& proto,
+    const vector<Layer*>& srclayers) {
+  CHECK_EQ(srclayers.size(), 2);
+  LossLayer::Setup(proto, srclayers);
+  data_.Reshape(srclayers[0]->data(this).shape());
   batchsize_ = data_.shape()[0];
   dim_ = data_.count() / batchsize_;
   topk_ = proto.softmaxloss_conf().topk();
-  metric_.Reshape(vector<int>{2});
   scale_ = proto.softmaxloss_conf().scale();
 }
-void SoftmaxLossLayer::ComputeFeature(int flag, Metric* perf) {
+void SoftmaxLossLayer::ComputeFeature(int flag,
+    const vector<Layer*>& srclayers) {
   Shape<2> s = Shape2(batchsize_, dim_);
   Tensor<cpu, 2> prob(data_.mutable_cpu_data(), s);
-  Tensor<cpu, 2> src(srclayers_[0]->mutable_data(this)->mutable_cpu_data(), s);
+  Tensor<cpu, 2> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), s);
   Softmax(prob, src);
-  const float* label = srclayers_[1]->data(this).cpu_data();
+  const float* label = srclayers[1]->data(this).cpu_data();
   const float* probptr = prob.dptr;
   float loss = 0, precision = 0;
   for (int n = 0; n < batchsize_; n++) {
@@ -108,13 +117,14 @@ void SoftmaxLossLayer::ComputeFeature(int flag, Metric* perf) {
     probptr += dim_;
   }
   CHECK_EQ(probptr, prob.dptr + prob.shape.Size());
-  perf->Add("loss", loss * scale_ / (1.0f * batchsize_));
-  perf->Add("accuracy", precision * scale_ / (1.0f * batchsize_));
+  metric_.Add("loss", loss * scale_ / (1.0f * batchsize_));
+  metric_.Add("accuracy", precision * scale_ / (1.0f * batchsize_));
 }
 
-void SoftmaxLossLayer::ComputeGradient(int flag, Metric* perf) {
-  const float* label = srclayers_[1]->data(this).cpu_data();
-  Blob<float>* gsrcblob = srclayers_[0]->mutable_grad(this);
+void SoftmaxLossLayer::ComputeGradient(int flag,
+    const vector<Layer*>& srclayers) {
+  const float* label = srclayers[1]->data(this).cpu_data();
+  Blob<float>* gsrcblob = srclayers[0]->mutable_grad(this);
   gsrcblob->CopyFrom(data_);
   float* gsrcptr = gsrcblob->mutable_cpu_data();
   for (int n = 0; n < batchsize_; n++) {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc
index 775a5a7..ec23c23 100644
--- a/src/neuralnet/neuralnet.cc
+++ b/src/neuralnet/neuralnet.cc
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -94,6 +94,7 @@ NeuralNet::~NeuralNet() {
     delete layer;
 }
 
+/*
 std::string NeuralNet::ToAdjacency() {
   string disp = "";
   for (auto& layer : layers_) {
@@ -104,6 +105,7 @@ std::string NeuralNet::ToAdjacency() {
   }
   return disp;
 }
+*/
 
 void NeuralNet::ShareParamsFrom(NeuralNet* other) {
   for (auto& layer : layers_) {
@@ -215,6 +217,7 @@ Graph* NeuralNet::CreateGraph(const NetProto& netproto, int npartitions) {
         // differentiate partitions
         string nodename = layer.name() + "@" + string(suffix);
         proto->set_partition_id(i);
+        proto->set_num_partitions(npartitions);
         proto->set_name(nodename);
         auto node = new Node(nodename, layer.name(), i, proto);
         graph->AddNode(node);
@@ -321,21 +324,19 @@ void NeuralNet::CreateNetFromGraph(Graph* graph, int npartitions) {
   }
   // connect layers
   for (Node* node : graph->nodes()) {
-    auto layer = name2layer_[node->name];
-    layer->clear_dstlayers();
-    for (Node* dst : node->dstnodes)
-      layer->add_dstlayer(name2layer_[dst->name]);
-    layer->clear_srclayers();
+    auto layer = name2layer(node->name);
+    src_map_[layer] = vector<Layer*>{};
     for (Node* src : node->srcnodes)
-      layer->add_srclayer(name2layer_[src->name]);
+      src_map_[layer].push_back(name2layer(src->name));
   }
+
   // setup layers
   int paramid = 0;
   map<string, string> layerinfo;
   map<string, vector<Layer*>> share_param_layers;
   for (Node* node : graph->nodes()) {
-    auto layer = name2layer_[node->name];
-    layer->Setup(*(static_cast<LayerProto*>(node->proto)), npartitions);
+    auto layer = name2layer(node->name);
+    layer->Setup(*(static_cast<LayerProto*>(node->proto)), srclayers(layer));
     LOG(INFO) << "constructing graph: " << layer->name();
     layerinfo[layer->name()] = IntVecToString(layer->data(nullptr).shape());
     string param_name = "$";

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/neuralnet/neuron_layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuron_layer.cc b/src/neuralnet/neuron_layer.cc
index 9e7831a..4e3acf0 100644
--- a/src/neuralnet/neuron_layer.cc
+++ b/src/neuralnet/neuron_layer.cc
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -73,17 +73,19 @@ ConvolutionLayer::~ConvolutionLayer() {
   delete weight_;
   delete bias_;
 }
-void ConvolutionLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
-  ConvolutionProto conv_conf = proto.convolution_conf();
+void ConvolutionLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  CHECK_EQ(srclayers.size(), 1);
+  Layer::Setup(conf, srclayers);
+  ConvolutionProto conv_conf = conf.convolution_conf();
   kernel_ = conv_conf.kernel();
   CHECK_GT(kernel_, 0) << "Filter size cannot be zero.";
   pad_ = conv_conf.pad();
   stride_ = conv_conf.stride();
   num_filters_ = conv_conf.num_filters();
   if (partition_dim() > 0)
-    num_filters_ /= npartitions;
-  const vector<int>& srcshape = srclayers_[0]->data(this).shape();
+    num_filters_ /= srclayers.at(0)->num_partitions();
+  const vector<int>& srcshape = srclayers[0]->data(this).shape();
   int dim = srcshape.size();
   CHECK_GT(dim, 2);
   width_ = srcshape[dim - 1];
@@ -102,14 +104,15 @@ void ConvolutionLayer::Setup(const LayerProto& proto, int npartitions) {
   grad_.Reshape(shape);
   col_data_.Reshape(vector<int>{col_height_, col_width_});
   col_grad_.Reshape(vector<int>{col_height_, col_width_});
-  weight_ = Param::Create(proto.param(0));
-  bias_ = Param::Create(proto.param(1));
+  weight_ = Param::Create(conf.param(0));
+  bias_ = Param::Create(conf.param(1));
   weight_->Setup(vector<int>{num_filters_, col_height_});
   bias_->Setup(vector<int>{num_filters_});
 }
 
-void ConvolutionLayer::ComputeFeature(int flag, Metric* perf) {
-  auto src = Tensor4(srclayers_[0]->mutable_data(this));
+void ConvolutionLayer::ComputeFeature(int flag,
+    const vector<Layer*>& srclayers) {
+  auto src = Tensor4(srclayers[0]->mutable_data(this));
   auto data = Tensor3(&data_);
   auto col = Tensor2(&col_data_);
   auto weight = Tensor2(weight_->mutable_data());
@@ -124,15 +127,16 @@ void ConvolutionLayer::ComputeFeature(int flag, Metric* perf) {
   data += expr::broadcast<1>(bias, data.shape);
 }
 
-void ConvolutionLayer::ComputeGradient(int flag, Metric* perf) {
-  auto src = Tensor4(srclayers_[0]->mutable_data(this));
+void ConvolutionLayer::ComputeGradient(int flag,
+    const vector<Layer*>& srclayers) {
+  auto src = Tensor4(srclayers[0]->mutable_data(this));
   auto col = Tensor2(&col_data_);
   auto weight = Tensor2(weight_->mutable_data());
   auto grad = Tensor3(&grad_);
   auto gcol = Tensor2(&col_grad_);
   auto gweight = Tensor2(weight_->mutable_grad());
   auto gbias = Tensor1(bias_->mutable_grad());
-  Blob<float>* gsrcblob = srclayers_[0]->mutable_grad(this);
+  Blob<float>* gsrcblob = srclayers[0]->mutable_grad(this);
   Tensor<cpu, 4> gsrc(nullptr, Shape4(batchsize_, channels_, height_, width_));
   if (gsrcblob != nullptr)
     gsrc.dptr = gsrcblob->mutable_cpu_data();
@@ -157,8 +161,9 @@ void ConvolutionLayer::ComputeGradient(int flag, Metric* perf) {
 }
 
 /******************* Implementation for CConvolutionLayer *********/
-void CConvolutionLayer::ComputeFeature(int flag, Metric* perf) {
-  auto src = Tensor4(srclayers_[0]->mutable_data(this));
+void CConvolutionLayer::ComputeFeature(int flag,
+    const vector<Layer*>& srclayers) {
+  auto src = Tensor4(srclayers[0]->mutable_data(this));
   auto data = Tensor3(&data_);
   auto col = Tensor2(&col_data_);
   auto weight = Tensor2(weight_->mutable_data());
@@ -172,8 +177,9 @@ void CConvolutionLayer::ComputeFeature(int flag, Metric* perf) {
   data += expr::broadcast<1>(bias, data.shape);
 }
 
-void CConvolutionLayer::ComputeGradient(int flag, Metric* perf) {
-  auto src = Tensor4(srclayers_[0]->mutable_data(this));
+void CConvolutionLayer::ComputeGradient(int flag,
+    const vector<Layer*>& srclayers) {
+  auto src = Tensor4(srclayers[0]->mutable_data(this));
   auto col = Tensor2(&col_data_);
   auto weight = Tensor2(weight_->mutable_data());
 
@@ -182,7 +188,7 @@ void CConvolutionLayer::ComputeGradient(int flag, Metric* perf) {
   auto gweight = Tensor2(weight_->mutable_grad());
   auto gbias = Tensor1(bias_->mutable_grad());
   gweight = 0.f;
-  Blob<float>* gsrcblob = srclayers_[0]->mutable_grad(this);
+  Blob<float>* gsrcblob = srclayers[0]->mutable_grad(this);
   Tensor<cpu, 4> gsrc(nullptr, Shape4(batchsize_, channels_, height_, width_));
   if (gsrcblob != nullptr)
     gsrc.dptr = gsrcblob->mutable_cpu_data();
@@ -200,18 +206,19 @@ void CConvolutionLayer::ComputeGradient(int flag, Metric* perf) {
 }
 
 /****************** Implementation for DropoutLayer ***********************/
-void DropoutLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
-  data_.ReshapeLike(srclayers_[0]->data(this));
-  grad_.ReshapeLike(*srclayers_[0]->mutable_grad(this));
-  mask_.Reshape(srclayers_[0]->data(this).shape());
-  pdrop_ = proto.dropout_conf().dropout_ratio();
+void DropoutLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(conf, srclayers);
+  data_.ReshapeLike(srclayers[0]->data(this));
+  grad_.ReshapeLike(*srclayers[0]->mutable_grad(this));
+  mask_.Reshape(srclayers[0]->data(this).shape());
+  pdrop_ = conf.dropout_conf().dropout_ratio();
 }
 
-void DropoutLayer::ComputeFeature(int flag, Metric* perf) {
+void DropoutLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   // check training
   if ((flag & kTrain) != kTrain) {
-    data_.CopyFrom(srclayers_[0]->data(this));
+    data_.CopyFrom(srclayers[0]->data(this));
     return;
   }
   float pkeep = 1 - pdrop_;
@@ -219,14 +226,14 @@ void DropoutLayer::ComputeFeature(int flag, Metric* perf) {
   mask = expr::F<op::threshold>(TSingleton<Random<cpu>>::Instance() \
                       ->uniform(mask.shape), pkeep) * (1.0f/pkeep);
   auto data = Tensor1(&data_);
-  auto src = Tensor1(srclayers_[0]->mutable_data(this));
+  auto src = Tensor1(srclayers[0]->mutable_data(this));
   data = src * mask;
 }
 
-void DropoutLayer::ComputeGradient(int flag, Metric* perf)  {
+void DropoutLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers)  {
   auto mask = Tensor1(&mask_);
   auto grad = Tensor1(&grad_);
-  auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this));
+  auto gsrc = Tensor1(srclayers[0]->mutable_grad(this));
   gsrc = grad * mask;
 }
 
@@ -251,11 +258,10 @@ Blob<float>* RBMLayer::Sample(int flag) {
   return (flag & kPositive) == kPositive || first_gibbs_ ?
     &sample_ : &neg_sample_;
 }
-void RBMLayer::Setup(const LayerProto& proto, int npartitions) {
-  CHECK_EQ(npartitions, 1);  // TODO(wangwei) test for npartitions > 1
-  Layer::Setup(proto, npartitions);
-  hdim_ = proto.rbm_conf().hdim();
-  gaussian_ = proto.rbm_conf().gaussian();
+void RBMLayer::Setup(const LayerProto& conf, const vector<Layer*>& srclayers) {
+  Layer::Setup(conf, srclayers);
+  hdim_ = conf.rbm_conf().hdim();
+  gaussian_ = conf.rbm_conf().gaussian();
   first_gibbs_ = true;
 }
 /**************** Implementation for RBMVisLayer********************/
@@ -264,32 +270,33 @@ RBMVisLayer::~RBMVisLayer() {
   delete bias_;
 }
 
-void RBMVisLayer::Setup(const LayerProto& proto, int npartitions) {
-  RBMLayer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 2);
+void RBMVisLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  CHECK_EQ(srclayers.size(), 2);
+  RBMLayer::Setup(conf, srclayers);
+  CHECK_EQ(srclayers.size(), 2);
   hid_layer_ = nullptr;
-  for (auto src : srclayers_) {
-    for (auto dst : src->srclayers()) {
-      if (dst->name() == name()) {
-        CHECK(hid_layer_ == nullptr);
-        hid_layer_ = static_cast<RBMHidLayer*>(src);
-      }
+  for (auto src : srclayers) {
+    if (typeid(*src) == typeid(RBMHidLayer)) {
+      // note the hid layer has may not been set up.
+      CHECK(hid_layer_ == nullptr);
+      hid_layer_ = dynamic_cast<RBMHidLayer*>(src);
     }
   }
-  input_layer_ = srclayers_[0] != hid_layer_ ? srclayers_[0]: srclayers_[1];
+  input_layer_ = srclayers[0] != hid_layer_ ? srclayers[0]: srclayers[1];
   const auto& src = input_layer_->data(this);
   batchsize_ = src.shape()[0];
   data_.ReshapeLike(src);
   neg_data_.ReshapeLike(data_);
   neg_sample_.ReshapeLike(data_);
   vdim_ = src.count() / batchsize_;
-  weight_ = Param::Create(proto.param(0));
+  weight_ = Param::Create(conf.param(0));
   weight_ ->Setup(vector<int>{hdim_, vdim_});
-  bias_ = Param::Create(proto.param(1));
+  bias_ = Param::Create(conf.param(1));
   bias_->Setup(vector<int>{vdim_});
 }
 
-void RBMVisLayer::ComputeFeature(int flag, Metric* perf) {
+void RBMVisLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   if ((flag & kPositive) == kPositive) {
     data_.CopyFrom(input_layer_->data(this), true);
     first_gibbs_ = true;
@@ -308,13 +315,13 @@ void RBMVisLayer::ComputeFeature(int flag, Metric* perf) {
       for (int i = 0; i < data_.count(); i++) {
         err += (dptr[i] - rcns[i]) * (dptr[i] - rcns[i]);
       }
-      perf->Add("Squared Error", err / batchsize_);
+      metric_.Add("Squared Error", err / batchsize_);
     }
     first_gibbs_ = false;
   }
 }
 
-void RBMVisLayer::ComputeGradient(int flag, Metric* perf) {
+void RBMVisLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   auto vis_pos = Tensor2(&data_);
   auto vis_neg = Tensor2(&neg_data_);
   auto hid_pos = Tensor2(hid_layer_->mutable_data(this));
@@ -336,25 +343,25 @@ RBMHidLayer::~RBMHidLayer() {
   delete bias_;
 }
 
-void RBMHidLayer::Setup(const LayerProto& proto,
-      int npartitions) {
-  RBMLayer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  const auto& src_data = srclayers_[0]->data(this);
+void RBMHidLayer::Setup(const LayerProto& conf,
+      const vector<Layer*>& srclayers) {
+  RBMLayer::Setup(conf, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  const auto& src_data = srclayers[0]->data(this);
   batchsize_ = src_data.shape()[0];
   vdim_ = src_data.count() / batchsize_;
   data_.Reshape(vector<int>{batchsize_, hdim_});
   neg_data_.ReshapeLike(data_);
   sample_.ReshapeLike(data_);
   neg_sample_.ReshapeLike(data_);
-  weight_ = Param::Create(proto.param(0));
+  weight_ = Param::Create(conf.param(0));
   weight_->Setup(vector<int>{hdim_, vdim_});
-  bias_ = Param::Create(proto.param(1));
+  bias_ = Param::Create(conf.param(1));
   bias_->Setup(vector<int>{hdim_});
-  vis_layer_ = static_cast<RBMVisLayer*> (srclayers_[0]);
+  vis_layer_ = dynamic_cast<RBMVisLayer*> (srclayers[0]);
 }
 
-void RBMHidLayer::ComputeFeature(int flag, Metric* perf) {
+void RBMHidLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   auto weight = Tensor2(weight_->mutable_data());
   auto bias = Tensor1(bias_->mutable_data());
 
@@ -376,7 +383,7 @@ void RBMHidLayer::ComputeFeature(int flag, Metric* perf) {
     data = expr::F<op::sigmoid>(data);
 }
 
-void RBMHidLayer::ComputeGradient(int flag, Metric* perf) {
+void RBMHidLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   auto hid_pos = Tensor2(&data_);
   auto hid_neg = Tensor2(&neg_data_);
   auto gbias = Tensor1(bias_->mutable_grad());
@@ -390,20 +397,21 @@ InnerProductLayer::~InnerProductLayer() {
   delete bias_;
 }
 
-void InnerProductLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  const auto& src = srclayers_[0]->data(this);
+void InnerProductLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(conf, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  const auto& src = srclayers[0]->data(this);
   batchsize_ = src.shape()[0];
   vdim_ = src.count() / batchsize_;
-  hdim_ = layer_proto_.innerproduct_conf().num_output();
-  transpose_ = proto.innerproduct_conf().transpose();
+  hdim_ = layer_conf_.innerproduct_conf().num_output();
+  transpose_ = conf.innerproduct_conf().transpose();
   if (partition_dim() > 0)
-    hdim_ /= npartitions;
+    hdim_ /= srclayers.at(0)->num_partitions();
   data_.Reshape(vector<int>{batchsize_, hdim_});
   grad_.ReshapeLike(data_);
-  weight_ = Param::Create(proto.param(0));
-  bias_ = Param::Create(proto.param(1));
+  weight_ = Param::Create(conf.param(0));
+  bias_ = Param::Create(conf.param(1));
   if (transpose_)
     weight_->Setup(vector<int>{vdim_, hdim_});
   else
@@ -411,9 +419,10 @@ void InnerProductLayer::Setup(const LayerProto& proto, int npartitions) {
   bias_->Setup(vector<int>{hdim_});
 }
 
-void InnerProductLayer::ComputeFeature(int flag, Metric* perf) {
+void InnerProductLayer::ComputeFeature(int flag,
+    const vector<Layer*>& srclayers) {
   auto data = Tensor2(&data_);
-  auto src = Tensor2(srclayers_[0]->mutable_data(this));
+  auto src = Tensor2(srclayers[0]->mutable_data(this));
   auto weight = Tensor2(weight_->mutable_data());
   auto bias = Tensor1(bias_->mutable_data());
   if (transpose_)
@@ -424,8 +433,9 @@ void InnerProductLayer::ComputeFeature(int flag, Metric* perf) {
   data += expr::repmat(bias, batchsize_);
 }
 
-void InnerProductLayer::ComputeGradient(int flag, Metric* perf) {
-  auto src = Tensor2(srclayers_[0]->mutable_data(this));
+void InnerProductLayer::ComputeGradient(int flag,
+    const vector<Layer*>& srclayers) {
+  auto src = Tensor2(srclayers[0]->mutable_data(this));
   auto grad = Tensor2(&grad_);
   auto weight = Tensor2(weight_->mutable_data());
   auto gweight = Tensor2(weight_->mutable_grad());
@@ -436,8 +446,8 @@ void InnerProductLayer::ComputeGradient(int flag, Metric* perf) {
     gweight = dot(src.T(), grad);
   else
     gweight = dot(grad.T(), src);
-  if (srclayers_[0]->mutable_grad(this) != nullptr) {
-    auto gsrc = Tensor2(srclayers_[0]->mutable_grad(this));
+  if (srclayers[0]->mutable_grad(this) != nullptr) {
+    auto gsrc = Tensor2(srclayers[0]->mutable_grad(this));
     if (transpose_)
       gsrc = dot(grad, weight.T());
     else
@@ -445,15 +455,15 @@ void InnerProductLayer::ComputeGradient(int flag, Metric* perf) {
   }
 }
 /***************** Implementation for LRNLayer *************************/
-void LRNLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  lsize_ = proto.lrn_conf().local_size();
+void LRNLayer::Setup(const LayerProto& conf, const vector<Layer*>& srclayers) {
+  Layer::Setup(conf, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  lsize_ = conf.lrn_conf().local_size();
   CHECK_EQ(lsize_ % 2, 1) << "LRN only supports odd values for Localvol";
-  knorm_ = proto.lrn_conf().knorm();
-  alpha_ = proto.lrn_conf().alpha();
-  beta_ = proto.lrn_conf().beta();
-  const vector<int>& s = srclayers_[0]->data(this).shape();
+  knorm_ = conf.lrn_conf().knorm();
+  alpha_ = conf.lrn_conf().alpha();
+  beta_ = conf.lrn_conf().beta();
+  const vector<int>& s = srclayers[0]->data(this).shape();
   data_.Reshape(s);
   grad_.Reshape(s);
   norm_.Reshape(s);
@@ -463,9 +473,9 @@ void LRNLayer::Setup(const LayerProto& proto, int npartitions) {
   width_ = s[3];
 }
 
-void LRNLayer::ComputeFeature(int flag, Metric* perf) {
+void LRNLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   const float salpha = alpha_ / lsize_;
-  auto src = Tensor4(srclayers_[0]->mutable_data(this));
+  auto src = Tensor4(srclayers[0]->mutable_data(this));
   auto data = Tensor4(&data_);
   auto norm = Tensor4(&norm_);
   // stores normalizer without power
@@ -474,12 +484,12 @@ void LRNLayer::ComputeFeature(int flag, Metric* perf) {
   data = src * expr::F<op::power>(norm, -beta_);
 }
 
-void LRNLayer::ComputeGradient(int flag, Metric* perf) {
+void LRNLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   const float salpha = alpha_ / lsize_;
-  auto src = Tensor4(srclayers_[0]->mutable_data(this));
+  auto src = Tensor4(srclayers[0]->mutable_data(this));
   auto norm = Tensor4(&norm_);
   auto grad = Tensor4(&grad_);
-  auto gsrc = Tensor4(srclayers_[0]->mutable_grad(this));
+  auto gsrc = Tensor4(srclayers[0]->mutable_grad(this));
 
   gsrc = grad * expr::F<op::power>(norm, -beta_);
   gsrc += (- 2.0f * beta_ * salpha) * expr::chpool<red::sum>(
@@ -487,18 +497,19 @@ void LRNLayer::ComputeGradient(int flag, Metric* perf) {
 }
 
 /******************** Implementation for PoolingLayer******************/
-void PoolingLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  PoolingProto pool_conf = proto.pooling_conf();
+void PoolingLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(conf, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  PoolingProto pool_conf = conf.pooling_conf();
   kernel_ = pool_conf.kernel();
   stride_ = pool_conf.stride();
   CHECK_LT(pad_, kernel_);
-  pool_ = proto.pooling_conf().pool();
+  pool_ = conf.pooling_conf().pool();
   CHECK(pool_ == PoolingProto_PoolMethod_AVG
         || pool_ == PoolingProto_PoolMethod_MAX)
         << "Padding implemented only for average and max pooling.";
-  const auto& srcshape = srclayers_[0]->data(this).shape();
+  const auto& srcshape = srclayers[0]->data(this).shape();
   int dim = srcshape.size();
   CHECK_GT(dim, 2);
   width_ = srcshape[dim - 1];
@@ -515,8 +526,8 @@ void PoolingLayer::Setup(const LayerProto& proto, int npartitions) {
   grad_.ReshapeLike(data_);
 }
 
-void PoolingLayer::ComputeFeature(int flag, Metric* perf) {
-  auto src = Tensor4(srclayers_[0]->mutable_data(this));
+void PoolingLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
+  auto src = Tensor4(srclayers[0]->mutable_data(this));
   auto data = Tensor4(&data_);
   if (pool_ == PoolingProto_PoolMethod_MAX)
     data = expr::pool<red::maximum>(src, kernel_, stride_);
@@ -529,9 +540,9 @@ void PoolingLayer::ComputeFeature(int flag, Metric* perf) {
  * partition only on num/channel dim
  * assume grad and data have the same paritition
  */
-void PoolingLayer::ComputeGradient(int flag, Metric* perf) {
-  auto src = Tensor4(srclayers_[0]->mutable_data(this));
-  auto gsrc = Tensor4(srclayers_[0]->mutable_grad(this));
+void PoolingLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
+  auto src = Tensor4(srclayers[0]->mutable_data(this));
+  auto gsrc = Tensor4(srclayers[0]->mutable_grad(this));
   auto data = Tensor4(&data_);
   auto grad = Tensor4(&grad_);
   if (pool_ == PoolingProto_PoolMethod_MAX)
@@ -543,101 +554,99 @@ void PoolingLayer::ComputeGradient(int flag, Metric* perf) {
 
 /***************** Implementation of CPoolingLayer ***************/
 
-void CPoolingLayer::Setup(const LayerProto& proto, int npartitions) {
-  PoolingLayer::Setup(proto, npartitions);
+void CPoolingLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  PoolingLayer::Setup(conf, srclayers);
   if (pool_ == PoolingProto_PoolMethod_MAX)
       mask_.ReshapeLike(data_);
 }
-void CPoolingLayer::ComputeFeature(int flag, Metric* perf) {
+void CPoolingLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   if (pool_ == PoolingProto_PoolMethod_MAX)
-    ForwardMaxPooling(srclayers_[0]->mutable_data(this)->mutable_cpu_data(),
+    ForwardMaxPooling(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
         batchsize_, channels_, height_, width_, kernel_, kernel_, pad_, pad_,
         stride_, stride_, data_.mutable_cpu_data(), mask_.mutable_cpu_data());
   else if (pool_ == PoolingProto_PoolMethod_AVG)
-    ForwardAvgPooling(srclayers_[0]->mutable_data(this)->mutable_cpu_data(),
+    ForwardAvgPooling(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
         batchsize_, channels_, height_, width_, kernel_, kernel_, pad_, pad_,
         stride_, stride_, data_.mutable_cpu_data());
   else
     LOG(FATAL) << "unknow pooling method";
 }
 
-void CPoolingLayer::ComputeGradient(int flag, Metric* perf) {
+void CPoolingLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   if (pool_ == PoolingProto_PoolMethod_MAX)
     BackwardMaxPooling(grad_.cpu_data(), mask_.cpu_data(), batchsize_,
         channels_, height_, width_, kernel_, kernel_, pad_, pad_,
-        stride_, stride_,srclayers_[0]->mutable_grad(this)->mutable_cpu_data());
+        stride_, stride_, srclayers[0]->mutable_grad(this)->mutable_cpu_data());
   else if (pool_ == PoolingProto_PoolMethod_AVG)
     BackwardAvgPooling(grad_.cpu_data(), batchsize_,
         channels_, height_, width_, kernel_, kernel_, pad_, pad_,
-        stride_, stride_,srclayers_[0]->mutable_grad(this)->mutable_cpu_data());
+        stride_, stride_, srclayers[0]->mutable_grad(this)->mutable_cpu_data());
   else
     LOG(FATAL) << "unknow pooling method";
 }
 
 /***************** Implementation for ReLULayer *****************************/
-void ReLULayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
-  data_.ReshapeLike(srclayers_[0]->data(this));
-  grad_.ReshapeLike(*(srclayers_[0]->mutable_grad(this)));
+void ReLULayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(conf, srclayers);
+  data_.ReshapeLike(srclayers[0]->data(this));
+  grad_.ReshapeLike(*(srclayers[0]->mutable_grad(this)));
 }
 
-void ReLULayer::ComputeFeature(int flag, Metric* perf) {
+void ReLULayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   auto data = Tensor1(&data_);
-  auto src = Tensor1(srclayers_[0]->mutable_data(this));
+  auto src = Tensor1(srclayers[0]->mutable_data(this));
   data = expr::F<op::relu>(src);
 }
 
-void ReLULayer::ComputeGradient(int flag, Metric* perf) {
+void ReLULayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   auto data = Tensor1(&data_);
   auto grad = Tensor1(&grad_);
-  auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this));
+  auto gsrc = Tensor1(srclayers[0]->mutable_grad(this));
   gsrc = expr::F<op::relu_grad>(data)*grad;
 }
 
 /*******************Implementation of SigmoidLayer***************************/
-void SigmoidLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
-  data_.ReshapeLike(srclayers_[0]->data(this));
-  grad_.ReshapeLike(srclayers_[0]->grad(this));
+void SigmoidLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(conf, srclayers);
+  data_.ReshapeLike(srclayers[0]->data(this));
+  grad_.ReshapeLike(srclayers[0]->grad(this));
 }
 
-void SigmoidLayer::ComputeFeature(int flag, Metric* perf) {
+void SigmoidLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   auto data = Tensor1(&data_);
-  auto src = Tensor1(srclayers_[0]->mutable_data(this));
+  auto src = Tensor1(srclayers[0]->mutable_data(this));
   data = expr::F<op::sigmoid>(src);
 }
 
-void SigmoidLayer::ComputeGradient(int flag, Metric* perf) {
+void SigmoidLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   auto data = Tensor1(&data_);
   auto grad = Tensor1(&grad_);
-  auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this));
+  auto gsrc = Tensor1(srclayers[0]->mutable_grad(this));
   gsrc = expr::F<op::sigmoid_grad>(data) * grad;
 }
 /*******************Implementation of TanLayer***************************/
-void STanhLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
-  data_.ReshapeLike(srclayers_[0]->data(this));
-  grad_.ReshapeLike(srclayers_[0]->grad(this));
+void STanhLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  Layer::Setup(conf, srclayers);
+  data_.ReshapeLike(srclayers[0]->data(this));
+  grad_.ReshapeLike(srclayers[0]->grad(this));
 }
 
-void STanhLayer::ComputeFeature(int flag, Metric* perf) {
+void STanhLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   auto data = Tensor1(&data_);
-  auto src = Tensor1(srclayers_[0]->mutable_data(this));
+  auto src = Tensor1(srclayers[0]->mutable_data(this));
   data = expr::F<op::stanh>(src);
 }
 
-void STanhLayer::ComputeGradient(int flag, Metric* perf) {
+void STanhLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   auto data = Tensor1(&data_);
   auto grad = Tensor1(&grad_);
-  auto gsrc = Tensor1(srclayers_[0]->mutable_grad(this));
+  auto gsrc = Tensor1(srclayers[0]->mutable_grad(this));
   gsrc = expr::F<op::stanh_grad>(data) * grad;
 }
-/********* Implementation for BridgeDstLayer **************/
-void BridgeDstLayer::Setup(const LayerProto& proto, int npartitions) {
-  Layer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  data_.Reshape(srclayers_[0]->data(this).shape());
-  grad_.ReshapeLike(data_);
-}
+
 
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/proto/job.proto
----------------------------------------------------------------------
diff --git a/src/proto/job.proto b/src/proto/job.proto
index dc202d9..950f785 100644
--- a/src/proto/job.proto
+++ b/src/proto/job.proto
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -59,9 +59,9 @@ message JobProto {
   // TODO(wangwei): set -1 for test forever
   optional int32 test_steps =  21 [default = 0];
   // frequency of validation, e.g., do validation every 100 training steps
-  optional int32 valid_freq = 25 [default = 0];
+  optional int32 validate_freq = 25 [default = 0];
   // total num of steps for validating all validation data
-  optional int32 valid_steps = 26 [default = 0];
+  optional int32 validate_steps = 26 [default = 0];
   // frequency of checkpoint
   optional int32 checkpoint_freq = 30 [default = 0];
 
@@ -83,7 +83,7 @@ message JobProto {
   // start test after this num steps
   optional int32 test_after = 82 [default = 0];
   // start validation after this num steps
-  optional int32 valid_after = 83 [default = 0];
+  optional int32 validate_after = 83 [default = 0];
 
   // for internal use
   // users typically do not touch following fields
@@ -224,6 +224,8 @@ message LayerProto {
   optional int32 partition_dim = 60 [default = -1];
   // names of parameters shared from other layers
   optional int32 partition_id = 90 [default = 0];
+  // num of partitions for this layer
+  optional int32 num_partitions = 91 [default = 1];
 
   extensions 101 to 200;
 }
@@ -571,7 +573,7 @@ enum PartitionType {
 enum Phase {
   kUnknown = 0;
   kTrain = 1;
-  kValidation = 2;
+  kVal = 2;
   kTest= 4;
   // postivie phase for contrastive divergence algorithm
   kPositive = 8;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/server.cc
----------------------------------------------------------------------
diff --git a/src/server.cc b/src/server.cc
new file mode 100644
index 0000000..3e0f4cb
--- /dev/null
+++ b/src/server.cc
@@ -0,0 +1,269 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#include "./server.h"
+
+#include <thread>
+#include <chrono>
+#include "mshadow/tensor.h"
+#include "proto/common.pb.h"
+#include "utils/param.h"
+#include "utils/singleton.h"
+#include "utils/factory.h"
+#include "utils/cluster.h"
+
+namespace singa {
+
+using namespace mshadow;
+using std::vector;
+
+Server::Server(int group_id, int server_id,
+    const JobProto& job_conf,
+    const vector<int>& slice2group,
+    const vector<int>& slice2server) {
+  grp_id_ = group_id;
+  id_ = server_id;
+  updater_ = Updater::Create(job_conf.updater());
+  slice2group_ = slice2group;
+  slice2server_ = slice2server;
+}
+
+Server::~Server() {
+  delete updater_;
+  // free Params (i.e., slices) in server shard
+  for (auto entry : shard_)
+    for (auto param : entry.second->shares)
+      delete param;
+}
+
+void Stop(void* running) {
+  *static_cast<bool *>(running) = false;
+}
+
+void Server::Run() {
+  LOG(ERROR) << "Server (group = " << grp_id_ <<", id = " << id_ << ") start";
+  auto cluster = Cluster::Get();
+  if (cluster->nserver_groups()) {
+    CHECK_GT(slice2group_.size(), 0);
+    if (cluster->nservers_per_group()) {
+      CHECK_GT(slice2server_.size(), 0);
+    }
+  }
+  n_updates_.resize(slice2group_.size(), 0);
+  n_pending_sync_.resize(slice2group_.size(), 0);
+  last_sync_.resize(slice2group_.size());
+
+  // TODO(wangsh): give each dealer a unique id
+  auto dealer = new Dealer(0);
+  CHECK(dealer->Connect(kInprocRouterEndpoint));
+  Msg* ping = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
+  ping->set_type(kConnect);
+  dealer->Send(&ping);
+
+  bool running = true;
+  CHECK(cluster->runtime()->WatchSGroup(grp_id_, id_, Stop, &running));
+  Poller poll(dealer);
+  // start recv loop and process requests
+  while (running) {
+    // must use poller here; otherwise Receive() gets stuck after workers stop.
+    auto* sock = poll.Wait(cluster->poll_time());
+    if (poll.Terminated()) {
+      LOG(ERROR) << "Connection broken!";
+      exit(0);
+    } else if (sock == nullptr) {
+      continue;
+    }
+    Msg* msg = dealer->Receive();
+    if (msg == nullptr) break;  // interrupted
+    Msg* response = nullptr;
+    int type = msg->type();
+    int slice_id = SliceID(msg->trgt_val());
+    if (type == kPut) {
+      response = HandlePut(&msg);
+    } else if (shard_.find(slice_id) == shard_.end()) {
+      // TODO(wangsh): buffer the msg instead, and process it after the
+      //               corresponding put request is done
+      // delay the processing by re-queue the msg. May sleep for a while?
+      response = msg;
+    } else {
+      switch (type) {
+        case kGet:
+          response = HandleGet(&msg);
+          break;
+        case kUpdate:
+          for (auto reply : HandleUpdate(&msg))
+            dealer->Send(&reply);
+          break;
+        case kSyncRequest:
+          response = HandleSyncRequest(&msg);
+          break;
+        case kSyncResponse:
+          HandleSyncResponse(&msg);
+          break;
+        default:
+          LOG(ERROR) << "Unknown message type: " << type;
+          break;
+      }
+    }
+    if (response != nullptr)
+      dealer->Send(&response);
+  }
+
+  // send stop msg to stub
+  Msg* msg = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
+  msg->set_type(kStop);
+  dealer->Send(&msg);
+  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+  LOG(ERROR) << "Server (group = " << grp_id_ << ", id = " << id_ << ") stops";
+  delete dealer;
+}
+
+Msg* Server::HandlePut(Msg **msg) {
+  int version = (*msg)->trgt_version();
+  int slice_id = SliceID((*msg)->trgt_val());
+  if (shard_.find(slice_id) != shard_.end())
+    LOG(FATAL) << "Param (" << slice_id << ") is put more than once";
+
+  // TODO(wangwei) replace hard coded param type 0
+  auto  param = Singleton<Factory<Param>>::Instance()->Create(0);
+  auto response = param->HandlePutMsg(msg, true);
+  // parse num of shares of this param from a worker group
+  int num_shares = 1;
+  if ((*msg)->NextFrame())
+    (*msg)->ParseFormatFrame("i", &num_shares);
+  DeleteMsg(msg);
+  shard_[slice_id] = new ParamEntry(num_shares, param);
+  // must set version after HandlePutMsg which allocates the memory
+  param->set_version(version);
+  param->set_local_version(version);
+  param->set_id(slice_id);
+  // allocate blob for param sync between groups.
+  if (slice2group_[slice_id] != grp_id_) {
+    last_sync_[slice_id].ReshapeLike(param->data());
+    last_sync_[slice_id].CopyFrom(param->data());
+  }
+  LOG(INFO) << "server (group = " << grp_id_ << ", id = " << id_
+            <<") put slice=" << slice_id << " size=" << param->size();
+  return response;
+}
+
+Msg* Server::HandleGet(Msg **msg) {
+  int val = (*msg)->trgt_val();
+  auto param = shard_.at(SliceID(val))->shares.at(0);
+  // re-queue the request if the param is not updated to the required version
+  if (param->version() < (*msg)->trgt_version()) {
+    return *msg;
+  } else {
+    // LOG(ERROR) << "get " << slice << " from "<<(*msg)->src_first();
+    auto reply = param->HandleGetMsg(msg, false);
+    reply->set_trgt(val, param->version());
+    return reply;
+  }
+}
+
+const vector<Msg*> Server::HandleUpdate(Msg **msg) {
+  vector<Msg*> ret;
+  int sliceid = SliceID((*msg)->trgt_val());
+  auto entry = shard_.at(sliceid);
+  buffer_requests_[sliceid].push_back(*msg);
+  int num_update;
+  (*msg)->LastFrame();
+  (*msg)->ParseFormatFrame("i", &num_update);
+  (*msg)->FirstFrame();
+  entry->num_update += num_update;
+  // LOG(ERROR) << "update "<< sliceid << " from " << AddrGrp((*msg)->src())
+  //            << ", " << num_update << " total " << entry->num_total;
+  // do update until recv gradients from all shares of this param/slice
+  if (entry->num_update >= entry->num_total) {
+    CHECK_EQ(entry->num_update, entry->num_total);
+    auto& request = buffer_requests_.at(sliceid);
+    int step = (*msg)->trgt_version();
+    int trgt_val = (*msg)->trgt_val();
+    auto param = entry->shares.at(0);
+    // extract and aggregate gradients
+    param->ParseUpdateMsgs(request);
+    updater_->Update(step, param, 1.0f / entry->num_total);
+    param->set_local_version(param->local_version() + 1);
+    // response to all shares of this param
+    for (auto response : param->GenUpdateResponseMsgs(&request, false)) {
+      response->set_trgt(trgt_val, param->local_version());
+      ret.push_back(response);
+    }
+    entry->num_update = 0;
+    n_updates_[sliceid]++;
+    // sync with master group after at least sync_freq local updates
+    // the last check is to avoid sending msg to stopped servers
+    if (slice2group_[sliceid] != grp_id_
+        && n_updates_[sliceid] >= Cluster::Get()->sync_freq()
+        && n_pending_sync_[sliceid] <= Cluster::Get()->sync_freq()) {
+      auto shape = Shape1(param->size());
+      Tensor<cpu, 1> tmp(last_sync_[sliceid].mutable_cpu_data(), shape);
+      Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
+      tmp = cur - tmp;
+      int addr = Addr(slice2group_[sliceid], slice2server_[sliceid], kServer);
+      Msg* sync = new Msg(Addr(grp_id_, id_, kServer), addr);
+      sync->set_type(kSyncRequest);
+      sync->set_trgt(trgt_val, param->local_version());
+      sync->AddFrame(tmp.dptr, param->size() * sizeof(float));
+      Copy(tmp, cur);
+      ret.push_back(sync);
+      n_updates_[sliceid] = 0;
+      n_pending_sync_[sliceid]++;
+    }
+  }
+  // message already pushed to buffer, just need to reset the pointer
+  *msg = nullptr;
+  return ret;
+}
+
+Msg* Server::HandleSyncRequest(Msg **msg) {
+  Msg* msgg = *msg;
+  int slice = SliceID(msgg->trgt_val());
+  auto param = shard_.at(slice)->shares.at(0);
+  auto shape = Shape1(param->size());
+  CHECK_EQ(msgg->FrameSize(), param->size()*sizeof(float));
+  Tensor<cpu, 1> inc(static_cast<float*>(msgg->FrameData()), shape);
+  Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
+  // recv sync msg on the slice I am maintaining
+  cur += inc;
+  msgg->SwapAddr();
+  msgg->set_type(kSyncResponse);
+  // copy the fresh param value into the response msg
+  Copy(inc, cur);
+  return msgg;
+}
+
+// recv sync msg on slice mastered by others
+void Server::HandleSyncResponse(Msg **msg) {
+  Msg* msgg = *msg;
+  int slice = SliceID(msgg->trgt_val());
+  auto param = shard_.at(slice)->shares.at(0);
+  auto shape = Shape1(param->size());
+  Tensor<cpu, 1> prev(last_sync_[param->id()].mutable_cpu_data(), shape);
+  Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
+  Tensor<cpu, 1> master(static_cast<float*>(msgg->FrameData()), shape);
+  cur += master - prev;  // cur = master + (cur - prev);
+  Copy(prev, cur);
+  DeleteMsg(msg);
+  n_pending_sync_[slice]--;
+}
+
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/stub.cc
----------------------------------------------------------------------
diff --git a/src/stub.cc b/src/stub.cc
new file mode 100644
index 0000000..7b439e5
--- /dev/null
+++ b/src/stub.cc
@@ -0,0 +1,285 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#include "./stub.h"
+
+#include <glog/logging.h>
+#include <unistd.h>
+#include <map>
+#include <thread>
+#include <set>
+#include "mshadow/tensor.h"
+#include "proto/common.pb.h"
+#include "utils/cluster.h"
+#include "utils/common.h"
+#include "utils/tinydir.h"
+
+namespace singa {
+
+using std::vector;
+using std::string;
+
+/***********************Stub****************************/
+Stub::~Stub() {
+  delete router_;
+}
+void Stub::Setup() {
+  router_ = new Router();
+  router_->Bind(kInprocRouterEndpoint);
+  auto cluster = Cluster::Get();
+  const string hostip = cluster->hostip();
+  int port = router_->Bind("tcp://" + hostip + ":*");
+  endpoint_ = hostip + ":" + std::to_string(port);
+}
+/**
+ * Get a hash id for a Param object from a group.
+ *
+ * Simple multiple group_id with a large prime number 997 (assuming there are
+ * no more than 997 worker groups) and plus owner param id.
+ */
+inline int Hash(int grp_id, int param_id) {
+  return grp_id * 997 + param_id;
+}
+const std::unordered_map<int, ParamEntry*>  CreateParamShard(
+    const vector<Worker*>& workers) {
+  std::unordered_map<int, ParamEntry*> shard;
+  // grp id -> net
+  std::unordered_map<int, NeuralNet*> grp2net;
+  // grp id -> worker id range
+  std::unordered_map<int, std::pair<int, int>> grp2workers;
+  for (auto worker : workers) {
+    int grp = worker->grp_id(), id = worker->id();
+    if (grp2net.find(grp) == grp2net.end()) {
+      grp2net[grp] = worker->train_net();
+      grp2workers[grp] = std::make_pair(id, id + 1);
+    } else {
+      CHECK_EQ(grp2net[grp], worker->train_net());
+      int start = grp2workers[grp].first, end = grp2workers[grp].second;
+      if (start > id) start = id;
+      if (end < id + 1) end = id + 1;
+      grp2workers[grp] = std::make_pair(start, end);
+    }
+  }
+
+  for (const auto entry : grp2net) {
+    int grp = entry.first;
+    int wstart = grp2workers[grp].first, wend = grp2workers[grp].second;
+    for (auto layer : entry.second->layers()) {
+      int partition = layer->partition_id();
+      bool local =  partition >= wstart && partition < wend;
+      for (auto param : layer->GetParams()) {
+        int hash = Hash(grp, param->owner());
+        if (shard.find(hash) == shard.end())
+          shard[hash] = new ParamEntry();
+        shard[hash]->AddParam(local, param);
+      }
+    }
+  }
+  return shard;
+}
+
+void Stub::Run(const vector<int>& slice2server,
+    const vector<Worker*>& workers, const vector<Server*>& servers) {
+  slice2server_ = slice2server;
+  int nworkers = workers.size(), nservers = servers.size();
+  auto cluster = Cluster::Get();
+  int procs_id = cluster->procs_id();
+  LOG(INFO) << "Stub in process " << procs_id << " starts";
+  auto shard = CreateParamShard(workers);
+  std::map<int, Dealer*> inter_dealers;  // for sending msg to other procs
+  std::queue<Msg*> msg_queue;
+  while (true) {
+    Msg* msg = nullptr;
+    if (msg_queue.empty()) {
+      msg = router_->Receive();
+    } else {
+      msg = msg_queue.front();
+      msg_queue.pop();
+    }
+    int type = msg->type(), dst = msg->dst(), flag = AddrType(dst);
+    if (flag == kStub && (AddrProc(dst) == procs_id || AddrGrp(dst) == -1)) {
+      //  the following statements are ordered!
+      if (type == kConnect) {
+        DeleteMsg(&msg);
+      } else if (type == kStop) {
+        int src_flag = AddrType(msg->src());
+        if (src_flag == kServer) nservers--;
+        else if (src_flag == kWorkerParam) nworkers--;
+        DeleteMsg(&msg);
+        if (nworkers == 0 && nservers == 0) break;
+      } else {
+        int grp;
+        int paramid = ParamID(msg->trgt_val());
+        ParamEntry *entry = nullptr;
+        switch (type) {
+          case kUpdate:
+            grp = AddrGrp(msg->src());
+            entry = shard.at(Hash(grp, paramid));
+            for (auto update_msg : HandleUpdateRequest(entry, &msg))
+              msg_queue.push(update_msg);
+            break;
+          case kRUpdate:
+            grp = AddrGrp(msg->dst());
+            entry = shard.at(Hash(grp, paramid));
+            HandleUpdateResponse(entry, &msg);
+            break;
+          case kGet:
+            grp = AddrGrp(msg->src());
+            entry = shard.at(Hash(grp, paramid));
+            for (auto get_msg : HandleGetRequest(entry, &msg))
+              msg_queue.push(get_msg);
+            break;
+          case kRGet:
+            grp = AddrGrp(msg->dst());
+            entry = shard.at(Hash(grp, paramid));
+            HandleGetResponse(entry, &msg);
+            break;
+          case kPut:
+            grp = AddrGrp(msg->src());
+            entry = shard.at(Hash(grp, paramid));
+            for (auto put_msg : HandlePutRequest(entry, &msg))
+              msg_queue.push(put_msg);
+            break;
+          default:
+            LOG(ERROR) << "Unknow message type:" << type;
+            break;
+        }
+      }
+    } else {
+      int dst_procs = AddrProc(dst);
+      if (flag != kStub)
+        dst_procs = cluster->ProcsIDOf(AddrGrp(dst), AddrID(dst), flag);
+      if (dst_procs != procs_id) {
+        if (inter_dealers.find(dst_procs) == inter_dealers.end())
+          inter_dealers[dst_procs] = CreateInterProcsDealer(dst_procs);
+        inter_dealers[dst_procs]->Send(&msg);
+      } else {
+        router_->Send(&msg);
+      }
+    }
+  }
+  LOG(ERROR) << "Stub in process " << procs_id << " stops";
+  for (auto& entry : inter_dealers)
+    delete entry.second;
+}
+
+Dealer* Stub::CreateInterProcsDealer(int dst_procs) {
+  // forward to other procs
+  auto cluster = Cluster::Get();
+  auto dealer = new Dealer();
+  while (cluster->endpoint(dst_procs) == "") {
+    // kCollectSleepTime));
+    std::this_thread::sleep_for(std::chrono::milliseconds(3000));
+    LOG(ERROR) << "waiting for procs " << dst_procs << " to register";
+  }
+  dealer->Connect("tcp://"+cluster->endpoint(dst_procs));
+  return dealer;
+}
+
+void Stub::GenMsgs(int type, int version, ParamEntry* entry, Msg* msg,
+                      vector<Msg*> *ret) {
+  int procs_id = Cluster::Get()->procs_id();
+  int src_grp = AddrGrp(msg->src());
+  int dst_grp = src_grp / Cluster::Get()->nworker_groups_per_server_group();
+  auto param = entry->shares.at(0);
+  for (int idx = 0 ; idx < param->num_slices(); idx++) {
+    int slice_id = param->slice_start() + idx;
+    int server = slice2server_[slice_id];
+    int dst_procs = Cluster::Get()->ProcsIDOf(dst_grp, server, kServer);
+    Msg* new_msg = nullptr;
+    if (type == kPut) {
+      CHECK_GT(entry->num_total, 0);
+      new_msg = param->GenPutMsg(dst_procs != procs_id, idx);
+      new_msg->AddFormatFrame("i", entry->num_total);
+    } else if (type == kGet) {
+      new_msg = param->GenGetMsg(dst_procs != procs_id, idx);
+    } else if (type == kUpdate) {
+      new_msg = param->GenUpdateMsg(dst_procs != procs_id, idx);
+      new_msg->AddFormatFrame("i", entry->num_local);
+    } else {
+      LOG(FATAL) << "Wrong type";
+    }
+    new_msg->set_trgt(ParamTrgt(param->owner(), slice_id), version);
+    new_msg->set_src(Addr(src_grp, procs_id, kStub));
+    new_msg->set_dst(Addr(dst_grp, server, kServer));
+    ret->push_back(new_msg);
+  }
+}
+
+const vector<Msg*> Stub::HandleGetRequest(ParamEntry* entry, Msg** msg) {
+  vector<Msg*> ret;
+  int version = (*msg)->trgt_version();
+  if (version > entry->next_version) {
+    entry->next_version = version;
+    GenMsgs(kGet, version, entry, *msg, &ret);
+  }
+  DeleteMsg(msg);
+  return ret;
+}
+
+const vector<Msg*> Stub::HandleUpdateRequest(ParamEntry *entry, Msg** msg) {
+  vector<Msg*> ret;
+  entry->num_update++;
+  if (entry->num_update >= entry->num_local) {
+    // average local gradient
+    if (entry->num_local > 1) {
+      auto it = entry->shares.begin();
+      auto shape = mshadow::Shape1((*it)->size());
+      mshadow::Tensor<mshadow::cpu, 1> sum((*it)->mutable_cpu_grad(), shape);
+      for (++it; it != entry->shares.end(); it++) {
+        mshadow::Tensor<mshadow::cpu, 1> grad((*it)->mutable_cpu_grad(), shape);
+        sum += grad;
+      }
+    }
+    int step = (*msg)->trgt_version();
+    GenMsgs(kUpdate, step, entry, *msg, &ret);
+    entry->num_update = 0;
+  }
+  DeleteMsg(msg);
+  return ret;
+}
+
+const vector<Msg*> Stub::HandlePutRequest(ParamEntry* entry, Msg** msg) {
+  vector<Msg*> ret;
+  int version = (*msg)->trgt_version();
+  GenMsgs(kPut, version, entry, *msg, &ret);
+  DeleteMsg(msg);
+  return ret;
+}
+
+void Stub::HandleGetResponse(ParamEntry* entry, Msg** msg) {
+  int version = (*msg)->trgt_version();
+  int sliceid = SliceID((*msg)->trgt_val());
+  auto param = entry->shares.at(0);
+  if (param->ParseGetResponseMsg(*msg, sliceid-param->slice_start()))
+    param->set_version(version);
+  DeleteMsg(msg);
+}
+
+void Stub::HandleUpdateResponse(ParamEntry* entry, Msg** msg) {
+  int version = (*msg)->trgt_version();
+  int sliceid = SliceID((*msg)->trgt_val());
+  auto param = entry->shares.at(0);
+  if (param->ParseUpdateResponseMsg(*msg, sliceid-param->slice_start()))
+    param->set_version(version);
+  DeleteMsg(msg);
+}
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/src/trainer/server.cc
----------------------------------------------------------------------
diff --git a/src/trainer/server.cc b/src/trainer/server.cc
deleted file mode 100644
index 5e74c1b..0000000
--- a/src/trainer/server.cc
+++ /dev/null
@@ -1,263 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-* 
-*   http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#include "trainer/server.h"
-
-#include <thread>
-#include <chrono>
-#include "mshadow/tensor.h"
-#include "proto/common.pb.h"
-#include "utils/param.h"
-#include "utils/singleton.h"
-#include "utils/factory.h"
-#include "utils/cluster.h"
-
-namespace singa {
-
-using namespace mshadow;
-using std::vector;
-
-Server::Server(int group_id, int server_id) {
-  grp_id_ = group_id;
-  id_ = server_id;
-}
-
-void Server::Setup(const UpdaterProto& proto, const vector<int>& slice2group,
-                   const vector<int>& slice2server) {
-  updater_ = Updater::Create(proto);
-  slice2group_ = slice2group;
-  slice2server_ = slice2server;
-  n_updates_.resize(slice2group_.size(), 0);
-  n_pending_sync_.resize(slice2group_.size(), 0);
-  last_sync_.resize(slice2group_.size());
-}
-
-Server::~Server() {
-  delete updater_;
-  // free Params (i.e., slices) in server shard
-  for (auto entry : shard_)
-    for (auto param : entry.second->shares)
-      delete param;
-}
-
-void Stop(void* running) {
-  *static_cast<bool *>(running) = false;
-}
-
-void Server::Run() {
-  LOG(ERROR) << "Server (group = " << grp_id_ <<", id = " << id_ << ") start";
-  // TODO(wangsh): give each dealer a unique id
-  auto dealer = new Dealer(0);
-  CHECK(dealer->Connect(kInprocRouterEndpoint));
-  Msg* ping = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
-  ping->set_type(kConnect);
-  dealer->Send(&ping);
-
-  auto cluster = Cluster::Get();
-  bool running = true;
-  CHECK(cluster->runtime()->WatchSGroup(grp_id_, id_, Stop, &running));
-  Poller poll(dealer);
-  // start recv loop and process requests
-  while (running) {
-    // must use poller here; otherwise Receive() gets stuck after workers stop.
-    auto* sock = poll.Wait(cluster->poll_time());
-    if (poll.Terminated()) {
-      LOG(ERROR) << "Connection broken!";
-      exit(0);
-    } else if (sock == nullptr) {
-      continue;
-    }
-    Msg* msg = dealer->Receive();
-    if (msg == nullptr) break;  // interrupted
-    Msg* response = nullptr;
-    int type = msg->type();
-    int slice_id = SliceID(msg->trgt_val());
-    if (type == kPut) {
-      response = HandlePut(&msg);
-    } else if (shard_.find(slice_id) == shard_.end()) {
-      // TODO(wangsh): buffer the msg instead, and process it after the
-      //               corresponding put request is done
-      // delay the processing by re-queue the msg. May sleep for a while?
-      response = msg;
-    } else {
-      switch (type) {
-        case kGet:
-          response = HandleGet(&msg);
-          break;
-        case kUpdate:
-          for (auto reply : HandleUpdate(&msg))
-            dealer->Send(&reply);
-          break;
-        case kSyncRequest:
-          response = HandleSyncRequest(&msg);
-          break;
-        case kSyncResponse:
-          HandleSyncResponse(&msg);
-          break;
-        default:
-          LOG(ERROR) << "Unknown message type: " << type;
-          break;
-      }
-    }
-    if (response != nullptr)
-      dealer->Send(&response);
-  }
-
-  // send stop msg to stub
-  Msg* msg = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub));
-  msg->set_type(kStop);
-  dealer->Send(&msg);
-  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-  LOG(ERROR) << "Server (group = " << grp_id_ << ", id = " << id_ << ") stops";
-  delete dealer;
-}
-
-Msg* Server::HandlePut(Msg **msg) {
-  int version = (*msg)->trgt_version();
-  int slice_id = SliceID((*msg)->trgt_val());
-  if (shard_.find(slice_id) != shard_.end())
-    LOG(FATAL) << "Param (" << slice_id << ") is put more than once";
-
-  // TODO(wangwei) replace hard coded param type 0
-  auto  param = Singleton<Factory<Param>>::Instance()->Create(0);
-  auto response = param->HandlePutMsg(msg, true);
-  // parse num of shares of this param from a worker group
-  int num_shares = 1;
-  if ((*msg)->NextFrame())
-    (*msg)->ParseFormatFrame("i", &num_shares);
-  DeleteMsg(msg);
-  shard_[slice_id] = new ParamEntry(num_shares, param);
-  // must set version after HandlePutMsg which allocates the memory
-  param->set_version(version);
-  param->set_local_version(version);
-  param->set_id(slice_id);
-  // allocate blob for param sync between groups.
-  if (slice2group_[slice_id] != grp_id_) {
-    last_sync_[slice_id].ReshapeLike(param->data());
-    last_sync_[slice_id].CopyFrom(param->data());
-  }
-  LOG(INFO) << "server (group = " << grp_id_ << ", id = " << id_
-            <<") put slice=" << slice_id << " size=" << param->size();
-  return response;
-}
-
-Msg* Server::HandleGet(Msg **msg) {
-  int val = (*msg)->trgt_val();
-  auto param = shard_.at(SliceID(val))->shares.at(0);
-  // re-queue the request if the param is not updated to the required version
-  if (param->version() < (*msg)->trgt_version()) {
-    return *msg;
-  } else {
-    // LOG(ERROR) << "get " << slice << " from "<<(*msg)->src_first();
-    auto reply = param->HandleGetMsg(msg, false);
-    reply->set_trgt(val, param->version());
-    return reply;
-  }
-}
-
-const vector<Msg*> Server::HandleUpdate(Msg **msg) {
-  vector<Msg*> ret;
-  int sliceid = SliceID((*msg)->trgt_val());
-  auto entry = shard_.at(sliceid);
-  buffer_requests_[sliceid].push_back(*msg);
-  int num_update;
-  (*msg)->LastFrame();
-  (*msg)->ParseFormatFrame("i", &num_update);
-  (*msg)->FirstFrame();
-  entry->num_update += num_update;
-  // LOG(ERROR) << "update "<< sliceid << " from " << AddrGrp((*msg)->src())
-  //            << ", " << num_update << " total " << entry->num_total;
-  // do update until recv gradients from all shares of this param/slice
-  if (entry->num_update >= entry->num_total) {
-    CHECK_EQ(entry->num_update, entry->num_total);
-    auto& request = buffer_requests_.at(sliceid);
-    int step = (*msg)->trgt_version();
-    int trgt_val = (*msg)->trgt_val();
-    auto param = entry->shares.at(0);
-    // extract and aggregate gradients
-    param->ParseUpdateMsgs(request);
-    updater_->Update(step, param, 1.0f / entry->num_total);
-    param->set_local_version(param->local_version() + 1);
-    // response to all shares of this param
-    for (auto response : param->GenUpdateResponseMsgs(&request, false)) {
-      response->set_trgt(trgt_val, param->local_version());
-      ret.push_back(response);
-    }
-    entry->num_update = 0;
-    n_updates_[sliceid]++;
-    // sync with master group after at least sync_freq local updates
-    // the last check is to avoid sending msg to stopped servers
-    if (slice2group_[sliceid] != grp_id_
-        && n_updates_[sliceid] >= Cluster::Get()->sync_freq()
-        && n_pending_sync_[sliceid] <= Cluster::Get()->sync_freq()) {
-      auto shape = Shape1(param->size());
-      Tensor<cpu, 1> tmp(last_sync_[sliceid].mutable_cpu_data(), shape);
-      Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
-      tmp = cur - tmp;
-      int addr = Addr(slice2group_[sliceid], slice2server_[sliceid], kServer);
-      Msg* sync = new Msg(Addr(grp_id_, id_, kServer), addr);
-      sync->set_type(kSyncRequest);
-      sync->set_trgt(trgt_val, param->local_version());
-      sync->AddFrame(tmp.dptr, param->size() * sizeof(float));
-      Copy(tmp, cur);
-      ret.push_back(sync);
-      n_updates_[sliceid] = 0;
-      n_pending_sync_[sliceid]++;
-    }
-  }
-  // message already pushed to buffer, just need to reset the pointer
-  *msg = nullptr;
-  return ret;
-}
-
-Msg* Server::HandleSyncRequest(Msg **msg) {
-  Msg* msgg = *msg;
-  int slice = SliceID(msgg->trgt_val());
-  auto param = shard_.at(slice)->shares.at(0);
-  auto shape = Shape1(param->size());
-  CHECK_EQ(msgg->FrameSize(), param->size()*sizeof(float));
-  Tensor<cpu, 1> inc(static_cast<float*>(msgg->FrameData()), shape);
-  Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
-  // recv sync msg on the slice I am maintaining
-  cur += inc;
-  msgg->SwapAddr();
-  msgg->set_type(kSyncResponse);
-  // copy the fresh param value into the response msg
-  Copy(inc, cur);
-  return msgg;
-}
-
-// recv sync msg on slice mastered by others
-void Server::HandleSyncResponse(Msg **msg) {
-  Msg* msgg = *msg;
-  int slice = SliceID(msgg->trgt_val());
-  auto param = shard_.at(slice)->shares.at(0);
-  auto shape = Shape1(param->size());
-  Tensor<cpu, 1> prev(last_sync_[param->id()].mutable_cpu_data(), shape);
-  Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape);
-  Tensor<cpu, 1> master(static_cast<float*>(msgg->FrameData()), shape);
-  cur += master - prev;  // cur = master + (cur - prev);
-  Copy(prev, cur);
-  DeleteMsg(msg);
-  n_pending_sync_[slice]--;
-}
-
-}  // namespace singa


Mime
View raw message