singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [09/13] incubator-singa git commit: SINGA-70 Refactor API of Layer, Worker, Server and Driver
Date Sun, 27 Sep 2015 14:34:32 GMT
SINGA-70 Refactor API of Layer, Worker, Server and Driver

For Layer class
* Setup, ComputeFeature and ComputeGradient are updated to accept one
argument which represents all source layers.
* DebugString() is changed to ToString() for displaying debug info and
other info. For example, the performance values can be aggregated in
ComputeFeature function and then converted into string in ToString().
* the srclayer and dstlayer fields are removed.

For Worker class
* Report function is removed. The performance is now collected via
Layer::ToString() and is reported by each worker.

The Trainer class is renamed to Stub
* Only the Run() function and message handling/generation functions are
remained. Functions for creating servers and workers are moved into
Driver.

The Driver class
* Rename function Submit to Train.
* Add functions for creating workers and servers.

All files under trainer folder are moved outside to be under src/ or
include/.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/321ef96a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/321ef96a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/321ef96a

Branch: refs/heads/master
Commit: 321ef96a6e90ce7c70fae7e05b446c3dd38a3fef
Parents: ab984da
Author: Wei Wang <wangwei@comp.nus.edu.sg>
Authored: Fri Sep 25 23:31:53 2015 +0800
Committer: Wei Wang <wangwei@comp.nus.edu.sg>
Committed: Sat Sep 26 23:37:36 2015 +0800

----------------------------------------------------------------------
 .gitignore                           |   4 +-
 Makefile.am                          |  27 +-
 examples/rbm/autoencoder.conf        |   8 +-
 examples/rbm/rbm2.conf               |   2 +-
 examples/rbm/rbm3.conf               |   2 +-
 examples/rbm/rbm4.conf               |   2 +-
 examples/rnnlm/job.conf              |   6 +-
 examples/rnnlm/main.cc               |   2 +-
 examples/rnnlm/rnnlm.cc              | 121 ++++----
 examples/rnnlm/rnnlm.h               |  34 ++-
 include/comm/msg.h                   | 238 +++++++++++++++
 include/comm/socket.h                | 174 +++++++++++
 include/communication/msg.h          | 238 ---------------
 include/communication/socket.h       | 174 -----------
 include/driver.h                     |  66 ++++-
 include/neuralnet/connection_layer.h |  47 +--
 include/neuralnet/input_layer.h      |  48 ++-
 include/neuralnet/layer.h            | 212 ++++++++------
 include/neuralnet/loss_layer.h       |  26 +-
 include/neuralnet/neuralnet.h        |  33 ++-
 include/neuralnet/neuron_layer.h     |  80 ++---
 include/server.h                     | 133 +++++++++
 include/singa.h                      |   9 +-
 include/stub.h                       | 109 +++++++
 include/trainer/server.h             | 132 ---------
 include/trainer/trainer.h            | 163 -----------
 include/trainer/worker.h             | 258 ----------------
 include/utils/param.h                |  38 ++-
 include/worker.h                     | 311 ++++++++++++++++++++
 src/comm/msg.cc                      | 215 ++++++++++++++
 src/comm/socket.cc                   | 180 ++++++++++++
 src/communication/msg.cc             | 215 --------------
 src/communication/socket.cc          | 180 ------------
 src/driver.cc                        | 203 ++++++++++++-
 src/main.cc                          |  22 +-
 src/neuralnet/connection_layer.cc    |  66 +++--
 src/neuralnet/input_layer.cc         |  75 ++---
 src/neuralnet/layer.cc               |  19 +-
 src/neuralnet/loss_layer.cc          |  68 +++--
 src/neuralnet/neuralnet.cc           |  21 +-
 src/neuralnet/neuron_layer.cc        | 285 +++++++++---------
 src/proto/job.proto                  |  14 +-
 src/server.cc                        | 269 +++++++++++++++++
 src/stub.cc                          | 285 ++++++++++++++++++
 src/trainer/server.cc                | 263 -----------------
 src/trainer/trainer.cc               | 469 ------------------------------
 src/trainer/worker.cc                | 411 --------------------------
 src/utils/cluster.cc                 |   6 +-
 src/utils/common.cc                  |   8 +-
 src/utils/param.cc                   |  54 +++-
 src/worker.cc                        | 410 ++++++++++++++++++++++++++
 51 files changed, 3330 insertions(+), 3105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 841b0d6..7ac9bc2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -42,10 +42,10 @@ stamp-h1
 *.status
 config.h
 Makefile
-thirdparty/*
 config/*
 config.h.in
 configure
 aclocal.m4
 Makefile.in
-!thirdpary/install.sh
+thirdparty/*
+!thirdparty/install.sh

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/Makefile.am
----------------------------------------------------------------------
diff --git a/Makefile.am b/Makefile.am
index 3f68e29..00aacdd 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -8,7 +8,7 @@ DEFAULT_FLAGS = -Wall -pthread -fPIC -std=c++11 -Wno-unknown-pragmas \
               $(MSHADOW_FLAGS) -DCPU_ONLY=1 -funroll-loops -DTHREADED
 
 CFLAGS = $(DEBUG)
-CXXFLAGS = $(DEBUG) 
+CXXFLAGS = $(DEBUG)
 AC_CXXFLAGS = $(DEBUG)
 
 INCLUDES = -I$(top_srcdir)/include
@@ -35,9 +35,9 @@ SINGA_SRCS := src/driver.cc \
               src/utils/updater.cc \
               src/utils/data_shard.cc \
               src/utils/blob.cc \
-              src/trainer/server.cc \
-              src/trainer/worker.cc \
-              src/trainer/trainer.cc \
+              src/server.cc \
+              src/worker.cc \
+              src/stub.cc \
               src/neuralnet/layer.cc \
               src/neuralnet/connection_layer.cc \
               src/neuralnet/input_layer.cc \
@@ -45,8 +45,8 @@ SINGA_SRCS := src/driver.cc \
               src/neuralnet/neuron_layer.cc \
               src/neuralnet/output_layer.cc \
               src/neuralnet/neuralnet.cc \
-              src/communication/socket.cc \
-              src/communication/msg.cc
+              src/comm/socket.cc \
+              src/comm/msg.cc
 
 SINGA_HDRS := include/singa.h \
               include/utils/cluster.h \
@@ -60,9 +60,9 @@ SINGA_HDRS := include/singa.h \
               include/utils/blob.h \
               include/utils/updater.h \
               include/utils/tinydir.h \
-              include/trainer/server.h \
-              include/trainer/worker.h \
-              include/trainer/trainer.h \
+              include/server.h \
+              include/worker.h \
+              include/stub.h \
               include/neuralnet/layer.h \
               include/neuralnet/connection_layer.h \
               include/neuralnet/input_layer.h \
@@ -78,8 +78,8 @@ SINGA_HDRS := include/singa.h \
               include/mshadow/cxxnet_op.h \
               include/mshadow/tensor_base.h \
               include/mshadow/tensor_random.h \
-              include/communication/msg.h \
-              include/communication/socket.h
+              include/comm/msg.h \
+              include/comm/socket.h
 
 GTEST_SRCS := include/gtest/gtest-all.cc
 GTEST_HRDS := include/gtest/gtest.h
@@ -108,10 +108,9 @@ endif
 libsinga_la_LDFLAGS = -I./include
 
 
-
 #bin_PROGRAMS = singa
 singa_SOURCES = src/main.cc
-singa_CXXFLAGS = $(DEFAULT_FLAGS) -MMD 
+singa_CXXFLAGS = $(DEFAULT_FLAGS) -MMD
 singa_LDFLAGS = -I./include \
                 -lsinga \
                 -lglog  \
@@ -146,7 +145,7 @@ libgtest_la_LDFLAGS = -I./include
 #bin_PROGRAMS += singatest
 
 singatest_SOURCES = $(GTEST_HDRS) $(TEST_SRCS)
-singatest_CXXFLAGS = $(DEFAULT_FLAGS) 
+singatest_CXXFLAGS = $(DEFAULT_FLAGS)
 singatest_LDFLAGS = -I./include \
                 -lsinga \
                 -lglog  \

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rbm/autoencoder.conf
----------------------------------------------------------------------
diff --git a/examples/rbm/autoencoder.conf b/examples/rbm/autoencoder.conf
index 29f7729..c818c6e 100644
--- a/examples/rbm/autoencoder.conf
+++ b/examples/rbm/autoencoder.conf
@@ -3,10 +3,10 @@ train_steps: 12200
 test_steps:100
 test_freq:1000
 disp_freq:100
-checkpoint_path: "examples/rbm/rbm1/checkpoint/step6000-worker0.bin"
-checkpoint_path: "examples/rbm/rbm2/checkpoint/step6000-worker0.bin"
-checkpoint_path: "examples/rbm/rbm3/checkpoint/step6000-worker0.bin"
-checkpoint_path: "examples/rbm/rbm4/checkpoint/step6000-worker0.bin"
+checkpoint_path: "examples/rbm/rbm1/checkpoint/step6000-worker0"
+checkpoint_path: "examples/rbm/rbm2/checkpoint/step6000-worker0"
+checkpoint_path: "examples/rbm/rbm3/checkpoint/step6000-worker0"
+checkpoint_path: "examples/rbm/rbm4/checkpoint/step6000-worker0"
 train_one_batch{
   alg: kBP
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rbm/rbm2.conf
----------------------------------------------------------------------
diff --git a/examples/rbm/rbm2.conf b/examples/rbm/rbm2.conf
index 8a16e0f..52dc698 100644
--- a/examples/rbm/rbm2.conf
+++ b/examples/rbm/rbm2.conf
@@ -6,7 +6,7 @@ disp_freq: 100
 train_one_batch{
   alg: kCD
 }
-checkpoint_path: "examples/rbm/rbm1/checkpoint/step6000-worker0.bin"
+checkpoint_path: "examples/rbm/rbm1/checkpoint/step6000-worker0"
 updater{
   type: kSGD
   momentum: 0.8

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rbm/rbm3.conf
----------------------------------------------------------------------
diff --git a/examples/rbm/rbm3.conf b/examples/rbm/rbm3.conf
index 75848d6..354fb3b 100644
--- a/examples/rbm/rbm3.conf
+++ b/examples/rbm/rbm3.conf
@@ -6,7 +6,7 @@ disp_freq: 100
 train_one_batch{
   alg: kCD
 }
-checkpoint_path: "examples/rbm/rbm2/checkpoint/step6000-worker0.bin"
+checkpoint_path: "examples/rbm/rbm2/checkpoint/step6000-worker0"
 
 updater{
   type: kSGD

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rbm/rbm4.conf
----------------------------------------------------------------------
diff --git a/examples/rbm/rbm4.conf b/examples/rbm/rbm4.conf
index 2b83afb..ebf39fa 100644
--- a/examples/rbm/rbm4.conf
+++ b/examples/rbm/rbm4.conf
@@ -6,7 +6,7 @@ disp_freq: 100
 train_one_batch{
   alg: kCD
 }
-checkpoint_path: "examples/rbm/rbm3/checkpoint/step6000-worker0.bin"
+checkpoint_path: "examples/rbm/rbm3/checkpoint/step6000-worker0"
 updater{
     type: kSGD
     momentum: 0.8

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rnnlm/job.conf
----------------------------------------------------------------------
diff --git a/examples/rnnlm/job.conf b/examples/rnnlm/job.conf
index db96e84..021692f 100644
--- a/examples/rnnlm/job.conf
+++ b/examples/rnnlm/job.conf
@@ -2,8 +2,8 @@ name: "rnnlm"
 #To scan the training file (81350) 10 times
 train_steps:81350
 #To scan the validation file (6828) once
-valid_steps:683
-valid_freq:8135
+validate_steps:683
+validate_freq:8135
 #disp_freq is specific to training
 disp_freq:8135
 train_one_batch {
@@ -36,7 +36,7 @@ layer {
     path: "examples/rnnlm/train_shard"
     max_window: 10
   }
-  exclude: kValidation
+  exclude: kVal
 }
 
 layer {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rnnlm/main.cc
----------------------------------------------------------------------
diff --git a/examples/rnnlm/main.cc b/examples/rnnlm/main.cc
index 87db06a..ea1dcdd 100644
--- a/examples/rnnlm/main.cc
+++ b/examples/rnnlm/main.cc
@@ -40,6 +40,6 @@ int main(int argc, char **argv) {
 
   singa::JobProto jobConf = driver.job_conf();
 
-  driver.Submit(resume, jobConf);
+  driver.Train(resume, jobConf);
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rnnlm/rnnlm.cc
----------------------------------------------------------------------
diff --git a/examples/rnnlm/rnnlm.cc b/examples/rnnlm/rnnlm.cc
index 0ad6dcd..c086972 100644
--- a/examples/rnnlm/rnnlm.cc
+++ b/examples/rnnlm/rnnlm.cc
@@ -57,19 +57,19 @@ DataLayer::~DataLayer() {
   shard_ = nullptr;
 }
 
-void DataLayer::Setup(const LayerProto& proto, int npartitions) {
-  RNNLayer::Setup(proto, npartitions);
+void DataLayer::Setup(const LayerProto& conf, const vector<Layer*>& srclayers) {
+  RNNLayer::Setup(conf, srclayers);
   shard_ = new singa::DataShard(
-               proto.GetExtension(data_conf).path(),
+               conf.GetExtension(data_conf).path(),
                singa::DataShard::kRead);
   string key;
-  max_window_ = proto.GetExtension(data_conf).max_window();
+  max_window_ = conf.GetExtension(data_conf).max_window();
   records_.resize(max_window_ + 1);  // resize to # of records in data layer
   window_ = 0;
   shard_->Next(&key, &records_[window_]);
 }
 
-void DataLayer::ComputeFeature(int flag, Metric *perf) {
+void DataLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
   CHECK(records_.size() <= shard_->Count());
   records_[0] = records_[window_];
   window_ = max_window_;
@@ -88,17 +88,18 @@ void DataLayer::ComputeFeature(int flag, Metric *perf) {
 }
 
 /*******LabelLayer**************/
-void LabelLayer::Setup(const LayerProto& proto, int npartitions) {
-  RNNLayer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  int max_window = dynamic_cast<DataLayer*>(srclayers_[0])->max_window();
+void LabelLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  RNNLayer::Setup(conf, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  int max_window = dynamic_cast<DataLayer*>(srclayers[0])->max_window();
   data_.Reshape(vector<int>{max_window, 4});
 }
 
-void LabelLayer::ComputeFeature(int flag, Metric *perf) {
-  const auto& records = dynamic_cast<DataLayer*>(srclayers_[0])->records();
+void LabelLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
+  const auto& records = dynamic_cast<DataLayer*>(srclayers[0])->records();
   float *label = data_.mutable_cpu_data();
-  window_ = dynamic_cast<RNNLayer*>(srclayers_[0])->window();
+  window_ = dynamic_cast<RNNLayer*>(srclayers[0])->window();
   for (int i = 0; i < window_; i++) {
     WordRecord wordrecord = records[i + 1].GetExtension(word);
     label[4 * i + 0] = wordrecord.class_start();
@@ -113,20 +114,21 @@ EmbeddingLayer::~EmbeddingLayer() {
   delete embed_;
 }
 
-void EmbeddingLayer::Setup(const LayerProto& proto, int npartitions) {
-  RNNLayer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  int max_window = dynamic_cast<DataLayer*>(srclayers_[0])->max_window();
-  word_dim_ = proto.GetExtension(embedding_conf).word_dim();
+void EmbeddingLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  RNNLayer::Setup(conf, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  int max_window = dynamic_cast<DataLayer*>(srclayers[0])->max_window();
+  word_dim_ = conf.GetExtension(embedding_conf).word_dim();
   data_.Reshape(vector<int>{max_window, word_dim_});
   grad_.ReshapeLike(data_);
-  vocab_size_ = proto.GetExtension(embedding_conf).vocab_size();
-  embed_ = Param::Create(proto.param(0));
+  vocab_size_ = conf.GetExtension(embedding_conf).vocab_size();
+  embed_ = Param::Create(conf.param(0));
   embed_->Setup(vector<int>{vocab_size_, word_dim_});
 }
 
-void EmbeddingLayer::ComputeFeature(int flag, Metric* perf) {
-  auto datalayer = dynamic_cast<DataLayer*>(srclayers_[0]);
+void EmbeddingLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
+  auto datalayer = dynamic_cast<DataLayer*>(srclayers[0]);
   window_ = datalayer->window();
   auto records = datalayer->records();
   auto words = RTensor2(&data_);
@@ -140,10 +142,11 @@ void EmbeddingLayer::ComputeFeature(int flag, Metric* perf) {
   }
 }
 
-void EmbeddingLayer::ComputeGradient(int flag, Metric* perf) {
+void EmbeddingLayer::ComputeGradient(int flag,
+    const vector<Layer*>& srclayers) {
   auto grad = RTensor2(&grad_);
   auto gembed = RTensor2(embed_->mutable_grad());
-  auto datalayer = dynamic_cast<DataLayer*>(srclayers_[0]);
+  auto datalayer = dynamic_cast<DataLayer*>(srclayers[0]);
   auto records = datalayer->records();
   gembed = 0;
   for (int t = 0; t < window_; t++) {
@@ -156,22 +159,23 @@ HiddenLayer::~HiddenLayer() {
   delete weight_;
 }
 
-void HiddenLayer::Setup(const LayerProto& proto, int npartitions) {
-  RNNLayer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 1);
-  const auto& innerproductData = srclayers_[0]->data(this);
-  data_.ReshapeLike(srclayers_[0]->data(this));
-  grad_.ReshapeLike(srclayers_[0]->grad(this));
+void HiddenLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  RNNLayer::Setup(conf, srclayers);
+  CHECK_EQ(srclayers.size(), 1);
+  const auto& innerproductData = srclayers[0]->data(this);
+  data_.ReshapeLike(srclayers[0]->data(this));
+  grad_.ReshapeLike(srclayers[0]->grad(this));
   int word_dim = data_.shape()[1];
-  weight_ = Param::Create(proto.param(0));
+  weight_ = Param::Create(conf.param(0));
   weight_->Setup(std::vector<int>{word_dim, word_dim});
 }
 
 // hid[t] = sigmoid(hid[t-1] * W + src[t])
-void HiddenLayer::ComputeFeature(int flag, Metric* perf) {
-  window_ = dynamic_cast<RNNLayer*>(srclayers_[0])->window();
+void HiddenLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
+  window_ = dynamic_cast<RNNLayer*>(srclayers[0])->window();
   auto data = RTensor2(&data_);
-  auto src = RTensor2(srclayers_[0]->mutable_data(this));
+  auto src = RTensor2(srclayers[0]->mutable_data(this));
   auto weight = RTensor2(weight_->mutable_data());
   for (int t = 0; t < window_; t++) {  // Skip the 1st component
     if (t == 0) {
@@ -184,12 +188,12 @@ void HiddenLayer::ComputeFeature(int flag, Metric* perf) {
   }
 }
 
-void HiddenLayer::ComputeGradient(int flag, Metric* perf) {
+void HiddenLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   auto data = RTensor2(&data_);
   auto grad = RTensor2(&grad_);
   auto weight = RTensor2(weight_->mutable_data());
   auto gweight = RTensor2(weight_->mutable_grad());
-  auto gsrc = RTensor2(srclayers_[0]->mutable_grad(this));
+  auto gsrc = RTensor2(srclayers[0]->mutable_grad(this));
   gweight = 0;
   TensorContainer<cpu, 1> tmp(Shape1(data_.shape()[1]));
   // Check!!
@@ -210,30 +214,30 @@ LossLayer::~LossLayer() {
   delete class_weight_;
 }
 
-void LossLayer::Setup(const LayerProto& proto, int npartitions) {
-  RNNLayer::Setup(proto, npartitions);
-  CHECK_EQ(srclayers_.size(), 2);
-  const auto& src = srclayers_[0]->data(this);
+void LossLayer::Setup(const LayerProto& conf, const vector<Layer*>& srclayers) {
+  RNNLayer::Setup(conf, srclayers);
+  CHECK_EQ(srclayers.size(), 2);
+  const auto& src = srclayers[0]->data(this);
   int max_window = src.shape()[0];
   int vdim = src.count() / max_window;   // Dimension of input
-  int vocab_size = proto.GetExtension(loss_conf).vocab_size();
-  int nclass = proto.GetExtension(loss_conf).nclass();
-  word_weight_ = Param::Create(proto.param(0));
+  int vocab_size = conf.GetExtension(loss_conf).vocab_size();
+  int nclass = conf.GetExtension(loss_conf).nclass();
+  word_weight_ = Param::Create(conf.param(0));
   word_weight_->Setup(vector<int>{vocab_size, vdim});
-  class_weight_ = Param::Create(proto.param(1));
+  class_weight_ = Param::Create(conf.param(1));
   class_weight_->Setup(vector<int>{nclass, vdim});
 
   pword_.resize(max_window);
   pclass_.Reshape(vector<int>{max_window, nclass});
 }
 
-void LossLayer::ComputeFeature(int flag, Metric* perf) {
-  window_ = dynamic_cast<RNNLayer*>(srclayers_[0])->window();
+void LossLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) {
+  window_ = dynamic_cast<RNNLayer*>(srclayers[0])->window();
   auto pclass = RTensor2(&pclass_);
-  auto src = RTensor2(srclayers_[0]->mutable_data(this));
+  auto src = RTensor2(srclayers[0]->mutable_data(this));
   auto word_weight = RTensor2(word_weight_->mutable_data());
   auto class_weight = RTensor2(class_weight_->mutable_data());
-  const float * label = srclayers_[1]->data(this).cpu_data();
+  const float * label = srclayers[1]->data(this).cpu_data();
 
   float loss = 0.f, ppl = 0.f;
   for (int t = 0; t < window_; t++) {
@@ -254,24 +258,21 @@ void LossLayer::ComputeFeature(int flag, Metric* perf) {
     int cid = static_cast<int>(label[t * 4 + 3]);
     CHECK_GT(end, wid);
     CHECK_GE(wid, start);
-    loss += -log(std::max(pword[wid - start] * pclass[t][cid], FLT_MIN));
-    ppl += log10(std::max(pword[wid - start] * pclass[t][cid], FLT_MIN));
+    loss_ += -log(std::max(pword[wid - start] * pclass[t][cid], FLT_MIN));
+    ppl_ += log10(std::max(pword[wid - start] * pclass[t][cid], FLT_MIN));
   }
-
-  perf->Add("loss", loss, window_);
-  // users can compute the PPL value by 10^(ppl before exp)
-  perf->Add("ppl before exp", ppl, window_);
+  num_ += window_;
 }
 
-void LossLayer::ComputeGradient(int flag, Metric* perf) {
+void LossLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) {
   auto pclass = RTensor2(&pclass_);
-  auto src = RTensor2(srclayers_[0]->mutable_data(this));
-  auto gsrc = RTensor2(srclayers_[0]->mutable_grad(this));
+  auto src = RTensor2(srclayers[0]->mutable_data(this));
+  auto gsrc = RTensor2(srclayers[0]->mutable_grad(this));
   auto word_weight = RTensor2(word_weight_->mutable_data());
   auto gword_weight = RTensor2(word_weight_->mutable_grad());
   auto class_weight = RTensor2(class_weight_->mutable_data());
   auto gclass_weight = RTensor2(class_weight_->mutable_grad());
-  const float * label = srclayers_[1]->data(this).cpu_data();
+  const float * label = srclayers[1]->data(this).cpu_data();
   gclass_weight = 0;
   gword_weight = 0;
   for (int t = 0; t < window_; t++) {
@@ -299,4 +300,10 @@ void LossLayer::ComputeGradient(int flag, Metric* perf) {
     gsrc[t] += dot(pclass[t], class_weight);
   }
 }
+
+const std::string LossLayer::ToString(bool debug, int flag) {
+  float loss = loss_ / num_;
+  float ppl = exp10(- ppl_ / num_);
+  return "loss = " + std::to_string(loss) + ", ppl = " + std::to_string(ppl);
+}
 }   // end of namespace rnnlm

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/examples/rnnlm/rnnlm.h
----------------------------------------------------------------------
diff --git a/examples/rnnlm/rnnlm.h b/examples/rnnlm/rnnlm.h
index b848fa4..ad0918e 100644
--- a/examples/rnnlm/rnnlm.h
+++ b/examples/rnnlm/rnnlm.h
@@ -25,6 +25,7 @@
 #include "./rnnlm.pb.h"
 
 namespace rnnlm {
+using std::vector;
 using singa::LayerProto;
 using singa::Layer;
 using singa::Param;
@@ -57,8 +58,8 @@ class RNNLayer : virtual public Layer {
 class DataLayer : public RNNLayer, public singa::DataLayer {
  public:
   ~DataLayer();
-  void Setup(const LayerProto& proto, int npartitions) override;
-  void ComputeFeature(int flag, Metric *perf) override;
+  void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
   int max_window() const {
     return max_window_;
   }
@@ -75,9 +76,9 @@ class DataLayer : public RNNLayer, public singa::DataLayer {
  */
 class LabelLayer : public RNNLayer {
  public:
-  void Setup(const LayerProto& proto, int npartitions) override;
-  void ComputeFeature(int flag, Metric *perf) override;
-  void ComputeGradient(int flag, Metric* perf) override {}
+  void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {}
 };
 
 
@@ -88,9 +89,9 @@ class LabelLayer : public RNNLayer {
 class EmbeddingLayer : public RNNLayer {
  public:
   ~EmbeddingLayer();
-  void Setup(const LayerProto& proto, int npartitions) override;
-  void ComputeFeature(int flag, Metric *perf) override;
-  void ComputeGradient(int flag, Metric* perf) override;
+  void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override;
   const std::vector<Param*> GetParams() const override {
     std::vector<Param*> params{embed_};
     return params;
@@ -111,9 +112,10 @@ class EmbeddingLayer : public RNNLayer {
 class HiddenLayer : public RNNLayer {
  public:
   ~HiddenLayer();
-  void Setup(const LayerProto& proto, int npartitions) override;
-  void ComputeFeature(int flag, Metric *perf) override;
-  void ComputeGradient(int flag, Metric* perf) override;
+  void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override;
+
   const std::vector<Param*> GetParams() const override {
     std::vector<Param*> params{weight_};
     return params;
@@ -132,9 +134,11 @@ class HiddenLayer : public RNNLayer {
 class LossLayer : public RNNLayer {
  public:
   ~LossLayer();
-  void Setup(const LayerProto& proto, int npartitions) override;
-  void ComputeFeature(int flag, Metric *perf) override;
-  void ComputeGradient(int flag, Metric* perf) override;
+  void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override;
+
+  const std::string ToString(bool debug, int flag) override;
   const std::vector<Param*> GetParams() const override {
     std::vector<Param*> params{word_weight_, class_weight_};
     return params;
@@ -144,6 +148,8 @@ class LossLayer : public RNNLayer {
   std::vector<Blob<float>> pword_;
   Blob<float> pclass_;
   Param* word_weight_, *class_weight_;
+  float loss_, ppl_;
+  int num_;
 };
 }  // namespace rnnlm
 #endif  // EXAMPLES_RNNLM_RNNLM_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/comm/msg.h
----------------------------------------------------------------------
diff --git a/include/comm/msg.h b/include/comm/msg.h
new file mode 100644
index 0000000..50a9b81
--- /dev/null
+++ b/include/comm/msg.h
@@ -0,0 +1,238 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#ifndef SINGA_COMM_MSG_H_
+#define SINGA_COMM_MSG_H_
+
+// TODO(wangwei): make it a compiler argument
+#define USE_ZMQ
+
+#include <utility>
+#ifdef USE_ZMQ
+#include <czmq.h>
+#endif
+
+namespace singa {
+/**
+ * Wrapper to generate message address
+ * @param grp worker/server group id
+ * @param id_or_proc worker/server id or procs id
+ * @param type msg type
+ */
+inline int Addr(int grp, int id_or_proc, int type) {
+  return (grp << 16) | (id_or_proc << 8) | type;
+}
+
+/**
+ * Parse group id from addr.
+ *
+ * @return group id
+ */
+inline int AddrGrp(int addr) {
+  return addr >> 16;
+}
+
+/**
+ * Parse worker/server id from addr.
+ *
+ * @return id
+ */
+inline int AddrID(int addr) {
+  static const int mask = (1 << 8) - 1;
+  return (addr >> 8) & mask;
+}
+
+/**
+ * Parse worker/server procs from addr.
+ *
+ * @return procs id
+ */
+inline int AddrProc(int addr) {
+  return AddrID(addr);
+}
+
+/**
+ * Parse msg type from addr
+ * @return msg type
+ */
+inline int AddrType(int addr) {
+  static const int mask = (1 << 8) -1;
+  return addr & mask;
+}
+
+/**
+ * Msg used to transfer Param info (gradient or value), feature blob, etc
+ * between workers, stubs and servers.
+ *
+ * Each msg has a source addr and dest addr identified by a unique integer.
+ * It is also associated with a target field (value and version) for ease of
+ * getting some meta info (e.g., parameter id) from the msg.
+ *
+ * Other data is added into the message as frames.
+ */
+class Msg {
+ public:
+  ~Msg();
+  Msg();
+  /**
+   * Construct the msg providing source and destination addr.
+   */
+  Msg(int src, int dst);
+  /**
+   * Copy constructor.
+   */
+  Msg(const Msg& msg);
+  /**
+   * Swap the src/dst addr
+   */
+  void SwapAddr();
+  /**
+   * Add a frame (a chunck of bytes) into the message
+   */
+  void AddFrame(const void* addr, int nBytes);
+  /**
+   * @return num of bytes of the current frame.
+   */
+  int FrameSize();
+  /**
+   * @return the pointer to the current frame data.
+   */
+  void* FrameData();
+  /**
+   * @return the data of the current frame as c string
+   */
+  char* FrameStr();
+  /**
+   * Move the cursor to the first frame.
+   */
+  void FirstFrame();
+  /**
+   * Move the cursor to the last frame.
+   */
+  void LastFrame();
+  /**
+   * Move the cursor to the next frame
+   * @return true if the next frame is not NULL; otherwise false
+   */
+  bool NextFrame();
+  /**
+   *  Add a 'format' frame to the msg (like CZMQ's zsock_send).
+   *
+   *  The format is a string that defines the type of each field.
+   *  The format can contain any of these characters, each corresponding to
+   *  one or two arguments:
+   *  i = int (signed)
+   *  1 = uint8_t
+   *  2 = uint16_t
+   *  4 = uint32_t
+   *  8 = uint64_t
+   *  p = void * (sends the pointer value, only meaningful over inproc)
+   *  s = char**
+   *
+   *  Returns size of the added content.
+   */
+  int AddFormatFrame(const char *format, ...);
+  /**
+   *  Parse the current frame added using AddFormatFrame(const char*, ...).
+   *
+   *  The format is a string that defines the type of each field.
+   *  The format can contain any of these characters, each corresponding to
+   *  one or two arguments:
+   *  i = int (signed)
+   *  1 = uint8_t
+   *  2 = uint16_t
+   *  4 = uint32_t
+   *  8 = uint64_t
+   *  p = void * (sends the pointer value, only meaningful over inproc)
+   *  s = char**
+   *
+   *  Returns size of the parsed content.
+   */
+  int ParseFormatFrame(const char* format, ...);
+
+#ifdef USE_ZMQ
+  void ParseFromZmsg(zmsg_t* msg);
+  zmsg_t* DumpToZmsg();
+#endif
+
+  /**
+   * @return msg size in terms of bytes, ignore meta info.
+   */
+  int size() const;
+  /**
+   * Set source addr.
+   * @param addr unique identify one worker/server/stub in the current job
+   */
+  inline void set_src(int addr) { src_ = addr; }
+  /**
+   * @return source addr.
+   */
+  inline int src() const { return src_; }
+  /**
+   * Set destination addr.
+   * @param addr unique identify one worker/server/stub in the current job
+   */
+  inline void set_dst(int addr) { dst_ = addr; }
+  /**
+   * @return dst addr.
+   */
+  inline int dst() const { return dst_; }
+  /**
+   * Set msg type, e.g., kPut, kGet, kUpdate, kRequest
+   */
+  inline void set_type(int type) { type_ = type; }
+  /**
+   * @return msg type.
+   */
+  inline int type() const { return type_; }
+  /**
+   * Set msg target.
+   *
+   * One msg has a target to identify some entity in worker/server/stub.
+   * The target is associated with a version, e.g., Param version.
+   */
+  inline void set_trgt(int val, int version) {
+    trgt_val_ = val;
+    trgt_version_ = version;
+  }
+  inline int trgt_val() const { return trgt_val_; }
+  inline int trgt_version() const { return trgt_version_; }
+
+ protected:
+  int src_ = 0;
+  int dst_ = 0;
+  int type_ = 0;
+  int trgt_val_ = 0;
+  int trgt_version_ = 0;
+#ifdef USE_ZMQ
+  zmsg_t* msg_ = nullptr;
+  zframe_t *frame_ = nullptr;
+#endif
+};
+
+inline void DeleteMsg(Msg** msg) {
+  delete *msg;
+  *msg = nullptr;
+}
+
+}  // namespace singa
+
+#endif  // SINGA_COMM_MSG_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/comm/socket.h
----------------------------------------------------------------------
diff --git a/include/comm/socket.h b/include/comm/socket.h
new file mode 100644
index 0000000..f2ffb4d
--- /dev/null
+++ b/include/comm/socket.h
@@ -0,0 +1,174 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#ifndef SINGA_COMM_SOCKET_H_
+#define SINGA_COMM_SOCKET_H_
+
+#ifdef USE_ZMQ
+#include <czmq.h>
+#endif
+#include <map>
+#include <string>
+#include <vector>
+#include "comm/msg.h"
+
+namespace singa {
+
+const std::string kInprocRouterEndpoint = "inproc://router";
+
+class SocketInterface {
+ public:
+  virtual ~SocketInterface() {}
+  /**
+    * Send a message to connected socket(s), non-blocking. The message
+    * will be deallocated after sending, thus should not be used after
+    * calling Send();
+    *
+    * @param msg The message to be sent
+    * @return 1 for success queuing the message for sending, 0 for failure
+    */
+  virtual int Send(Msg** msg) = 0;
+  /**
+    * Receive a message from any connected socket.
+    *
+    * @return a message pointer if success; nullptr if failure
+    */
+  virtual Msg* Receive() = 0;
+  /**
+   * @return Identifier of the implementation dependent socket. E.g., zsock_t*
+   * for ZeroMQ implementation and rank for MPI implementation.
+   */
+  virtual void* InternalID() const = 0;
+};
+
+class Poller {
+ public:
+  Poller();
+  explicit Poller(SocketInterface* socket);
+  /**
+    * Add a socket for polling; Multiple sockets can be polled together by
+    * adding them into the same poller.
+    */
+  void Add(SocketInterface* socket);
+  /**
+    * Poll for all sockets added into this poller.
+    * @param timeout Stop after this number of mseconds
+    * @return pointer To the socket if it has one message in the receiving
+    * queue; nullptr if no message in any sockets,
+    */
+  SocketInterface* Wait(int duration);
+
+  /**
+   * @return true if the poller is terminated due to process interupt
+   */
+  virtual bool Terminated();
+
+ protected:
+#ifdef USE_ZMQ
+  zpoller_t *poller_;
+  std::map<zsock_t*, SocketInterface*> zsock2Socket_;
+#endif
+};
+
+class Dealer : public SocketInterface {
+ public:
+  /*
+   * @param id Local dealer ID within a procs if the dealer is from worker or
+   * server thread, starts from 1 (0 is used by the router); or the connected
+   * remote procs ID for inter-process dealers from the stub thread.
+   */
+  Dealer();
+  explicit Dealer(int id);
+  ~Dealer() override;
+  /**
+    * Setup the connection with the router.
+    *
+    * @param endpoint Identifier of the router. For intra-process
+    * connection, the endpoint follows the format of ZeroMQ, i.e.,
+    * starting with "inproc://"; in Singa, since each process has one
+    * router, hence we can fix the endpoint to be "inproc://router" for
+    * intra-process. For inter-process, the endpoint follows ZeroMQ's
+    * format, i.e., IP:port, where IP is the connected process.
+    * @return 1 connection sets up successfully; 0 otherwise
+    */
+  int Connect(const std::string& endpoint);
+  int Send(Msg** msg) override;
+  Msg* Receive() override;
+  void* InternalID() const override;
+
+ protected:
+  int id_ = -1;
+#ifdef USE_ZMQ
+  zsock_t* dealer_ = nullptr;
+  zpoller_t* poller_ = nullptr;
+#endif
+};
+
+class Router : public SocketInterface {
+ public:
+  Router();
+  /**
+   * There is only one router per procs, hence its local id is 0 and is not set
+   * explicitly.
+   *
+   * @param bufsize Buffer at most this number of messages
+   */
+  explicit Router(int bufsize);
+  ~Router() override;
+  /**
+   * Setup the connection with dealers.
+   *
+   * It automatically binds to the endpoint for intra-process communication,
+   * i.e., "inproc://router".
+   *
+   * @param endpoint The identifier for the Dealer socket in other process
+   * to connect. It has the format IP:Port, where IP is the host machine.
+   * If endpoint is empty, it means that all connections are
+   * intra-process connection.
+   * @return number of connected dealers.
+   */
+  int Bind(const std::string& endpoint);
+  /**
+   * If the destination socket has not connected yet, buffer this the message.
+   */
+  int Send(Msg** msg) override;
+  Msg* Receive() override;
+  void* InternalID() const override;
+
+ protected:
+  int nBufmsg_ = 0;
+  int bufsize_ = 100;
+#ifdef USE_ZMQ
+  zsock_t* router_ = nullptr;
+  zpoller_t* poller_ = nullptr;
+  std::map<int, zframe_t*> id2addr_;
+  std::map<int, std::vector<zmsg_t*>> bufmsg_;
+#endif
+};
+
+#ifdef USE_MPI
+// TODO(wangsheng): add intra-process communication using shared queue
+std::vector<SafeQueue*> MPIQueues;
+#endif
+
+}  // namespace singa
+
+#endif  // SINGA_COMM_SOCKET_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/communication/msg.h
----------------------------------------------------------------------
diff --git a/include/communication/msg.h b/include/communication/msg.h
deleted file mode 100644
index 217d89a..0000000
--- a/include/communication/msg.h
+++ /dev/null
@@ -1,238 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-* 
-*   http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#ifndef SINGA_COMMUNICATION_MSG_H_
-#define SINGA_COMMUNICATION_MSG_H_
-
-// TODO(wangwei): make it a compiler argument
-#define USE_ZMQ
-
-#include <utility>
-#ifdef USE_ZMQ
-#include <czmq.h>
-#endif
-
-namespace singa {
-/**
- * Wrapper to generate message address
- * @param grp worker/server group id
- * @param id_or_proc worker/server id or procs id
- * @param type msg type
- */
-inline int Addr(int grp, int id_or_proc, int type) {
-  return (grp << 16) | (id_or_proc << 8) | type;
-}
-
-/**
- * Parse group id from addr.
- *
- * @return group id
- */
-inline int AddrGrp(int addr) {
-  return addr >> 16;
-}
-
-/**
- * Parse worker/server id from addr.
- *
- * @return id
- */
-inline int AddrID(int addr) {
-  static const int mask = (1 << 8) - 1;
-  return (addr >> 8) & mask;
-}
-
-/**
- * Parse worker/server procs from addr.
- *
- * @return procs id
- */
-inline int AddrProc(int addr) {
-  return AddrID(addr);
-}
-
-/**
- * Parse msg type from addr
- * @return msg type
- */
-inline int AddrType(int addr) {
-  static const int mask = (1 << 8) -1;
-  return addr & mask;
-}
-
-/**
- * Msg used to transfer Param info (gradient or value), feature blob, etc
- * between workers, stubs and servers.
- *
- * Each msg has a source addr and dest addr identified by a unique integer.
- * It is also associated with a target field (value and version) for ease of
- * getting some meta info (e.g., parameter id) from the msg.
- *
- * Other data is added into the message as frames.
- */
-class Msg {
- public:
-  ~Msg();
-  Msg();
-  /**
-   * Construct the msg providing source and destination addr.
-   */
-  Msg(int src, int dst);
-  /**
-   * Copy constructor.
-   */
-  Msg(const Msg& msg);
-  /**
-   * Swap the src/dst addr
-   */
-  void SwapAddr();
-  /**
-   * Add a frame (a chunck of bytes) into the message
-   */
-  void AddFrame(const void* addr, int nBytes);
-  /**
-   * @return num of bytes of the current frame.
-   */
-  int FrameSize();
-  /**
-   * @return the pointer to the current frame data.
-   */
-  void* FrameData();
-  /**
-   * @return the data of the current frame as c string
-   */
-  char* FrameStr();
-  /**
-   * Move the cursor to the first frame.
-   */
-  void FirstFrame();
-  /**
-   * Move the cursor to the last frame.
-   */
-  void LastFrame();
-  /**
-   * Move the cursor to the next frame
-   * @return true if the next frame is not NULL; otherwise false
-   */
-  bool NextFrame();
-  /**
-   *  Add a 'format' frame to the msg (like CZMQ's zsock_send).
-   *
-   *  The format is a string that defines the type of each field.
-   *  The format can contain any of these characters, each corresponding to
-   *  one or two arguments:
-   *  i = int (signed)
-   *  1 = uint8_t
-   *  2 = uint16_t
-   *  4 = uint32_t
-   *  8 = uint64_t
-   *  p = void * (sends the pointer value, only meaningful over inproc)
-   *  s = char**
-   *
-   *  Returns size of the added content.
-   */
-  int AddFormatFrame(const char *format, ...);
-  /**
-   *  Parse the current frame added using AddFormatFrame(const char*, ...).
-   *
-   *  The format is a string that defines the type of each field.
-   *  The format can contain any of these characters, each corresponding to
-   *  one or two arguments:
-   *  i = int (signed)
-   *  1 = uint8_t
-   *  2 = uint16_t
-   *  4 = uint32_t
-   *  8 = uint64_t
-   *  p = void * (sends the pointer value, only meaningful over inproc)
-   *  s = char**
-   *
-   *  Returns size of the parsed content.
-   */
-  int ParseFormatFrame(const char* format, ...);
-
-#ifdef USE_ZMQ
-  void ParseFromZmsg(zmsg_t* msg);
-  zmsg_t* DumpToZmsg();
-#endif
-
-  /**
-   * @return msg size in terms of bytes, ignore meta info.
-   */
-  int size() const;
-  /**
-   * Set source addr.
-   * @param addr unique identify one worker/server/stub in the current job
-   */
-  inline void set_src(int addr) { src_ = addr; }
-  /**
-   * @return source addr.
-   */
-  inline int src() const { return src_; }
-  /**
-   * Set destination addr.
-   * @param addr unique identify one worker/server/stub in the current job
-   */
-  inline void set_dst(int addr) { dst_ = addr; }
-  /**
-   * @return dst addr.
-   */
-  inline int dst() const { return dst_; }
-  /**
-   * Set msg type, e.g., kPut, kGet, kUpdate, kRequest
-   */
-  inline void set_type(int type) { type_ = type; }
-  /**
-   * @return msg type.
-   */
-  inline int type() const { return type_; }
-  /**
-   * Set msg target.
-   *
-   * One msg has a target to identify some entity in worker/server/stub.
-   * The target is associated with a version, e.g., Param version.
-   */
-  inline void set_trgt(int val, int version) {
-    trgt_val_ = val;
-    trgt_version_ = version;
-  }
-  inline int trgt_val() const { return trgt_val_; }
-  inline int trgt_version() const { return trgt_version_; }
-
- protected:
-  int src_ = 0;
-  int dst_ = 0;
-  int type_ = 0;
-  int trgt_val_ = 0;
-  int trgt_version_ = 0;
-#ifdef USE_ZMQ
-  zmsg_t* msg_ = nullptr;
-  zframe_t *frame_ = nullptr;
-#endif
-};
-
-inline void DeleteMsg(Msg** msg) {
-  delete *msg;
-  *msg = nullptr;
-}
-
-}  // namespace singa
-
-#endif  // SINGA_COMMUNICATION_MSG_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/communication/socket.h
----------------------------------------------------------------------
diff --git a/include/communication/socket.h b/include/communication/socket.h
deleted file mode 100644
index 3590577..0000000
--- a/include/communication/socket.h
+++ /dev/null
@@ -1,174 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-* 
-*   http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-
-#ifndef SINGA_COMMUNICATION_SOCKET_H_
-#define SINGA_COMMUNICATION_SOCKET_H_
-
-#ifdef USE_ZMQ
-#include <czmq.h>
-#endif
-#include <map>
-#include <string>
-#include <vector>
-#include "communication/msg.h"
-
-namespace singa {
-
-const std::string kInprocRouterEndpoint = "inproc://router";
-
-class SocketInterface {
- public:
-  virtual ~SocketInterface() {}
-  /**
-    * Send a message to connected socket(s), non-blocking. The message
-    * will be deallocated after sending, thus should not be used after
-    * calling Send();
-    *
-    * @param msg The message to be sent
-    * @return 1 for success queuing the message for sending, 0 for failure
-    */
-  virtual int Send(Msg** msg) = 0;
-  /**
-    * Receive a message from any connected socket.
-    *
-    * @return a message pointer if success; nullptr if failure
-    */
-  virtual Msg* Receive() = 0;
-  /**
-   * @return Identifier of the implementation dependent socket. E.g., zsock_t*
-   * for ZeroMQ implementation and rank for MPI implementation.
-   */
-  virtual void* InternalID() const = 0;
-};
-
-class Poller {
- public:
-  Poller();
-  explicit Poller(SocketInterface* socket);
-  /**
-    * Add a socket for polling; Multiple sockets can be polled together by
-    * adding them into the same poller.
-    */
-  void Add(SocketInterface* socket);
-  /**
-    * Poll for all sockets added into this poller.
-    * @param timeout Stop after this number of mseconds
-    * @return pointer To the socket if it has one message in the receiving
-    * queue; nullptr if no message in any sockets,
-    */
-  SocketInterface* Wait(int duration);
-
-  /**
-   * @return true if the poller is terminated due to process interupt
-   */
-  virtual bool Terminated();
-
- protected:
-#ifdef USE_ZMQ
-  zpoller_t *poller_;
-  std::map<zsock_t*, SocketInterface*> zsock2Socket_;
-#endif
-};
-
-class Dealer : public SocketInterface {
- public:
-  /*
-   * @param id Local dealer ID within a procs if the dealer is from worker or
-   * server thread, starts from 1 (0 is used by the router); or the connected
-   * remote procs ID for inter-process dealers from the stub thread.
-   */
-  Dealer();
-  explicit Dealer(int id);
-  ~Dealer() override;
-  /**
-    * Setup the connection with the router.
-    *
-    * @param endpoint Identifier of the router. For intra-process
-    * connection, the endpoint follows the format of ZeroMQ, i.e.,
-    * starting with "inproc://"; in Singa, since each process has one
-    * router, hence we can fix the endpoint to be "inproc://router" for
-    * intra-process. For inter-process, the endpoint follows ZeroMQ's
-    * format, i.e., IP:port, where IP is the connected process.
-    * @return 1 connection sets up successfully; 0 otherwise
-    */
-  int Connect(const std::string& endpoint);
-  int Send(Msg** msg) override;
-  Msg* Receive() override;
-  void* InternalID() const override;
-
- protected:
-  int id_ = -1;
-#ifdef USE_ZMQ
-  zsock_t* dealer_ = nullptr;
-  zpoller_t* poller_ = nullptr;
-#endif
-};
-
-class Router : public SocketInterface {
- public:
-  Router();
-  /**
-   * There is only one router per procs, hence its local id is 0 and is not set
-   * explicitly.
-   *
-   * @param bufsize Buffer at most this number of messages
-   */
-  explicit Router(int bufsize);
-  ~Router() override;
-  /**
-   * Setup the connection with dealers.
-   *
-   * It automatically binds to the endpoint for intra-process communication,
-   * i.e., "inproc://router".
-   *
-   * @param endpoint The identifier for the Dealer socket in other process
-   * to connect. It has the format IP:Port, where IP is the host machine.
-   * If endpoint is empty, it means that all connections are
-   * intra-process connection.
-   * @return number of connected dealers.
-   */
-  int Bind(const std::string& endpoint);
-  /**
-   * If the destination socket has not connected yet, buffer this the message.
-   */
-  int Send(Msg** msg) override;
-  Msg* Receive() override;
-  void* InternalID() const override;
-
- protected:
-  int nBufmsg_ = 0;
-  int bufsize_ = 100;
-#ifdef USE_ZMQ
-  zsock_t* router_ = nullptr;
-  zpoller_t* poller_ = nullptr;
-  std::map<int, zframe_t*> id2addr_;
-  std::map<int, std::vector<zmsg_t*>> bufmsg_;
-#endif
-};
-
-#ifdef USE_MPI
-// TODO(wangsheng): add intra-process communication using shared queue
-std::vector<SafeQueue*> MPIQueues;
-#endif
-
-}  // namespace singa
-
-#endif  // SINGA_COMMUNICATION_SOCKET_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/driver.h
----------------------------------------------------------------------
diff --git a/include/driver.h b/include/driver.h
index b33c7cc..9ae4b27 100644
--- a/include/driver.h
+++ b/include/driver.h
@@ -22,6 +22,7 @@
 #ifndef SINGA_DRIVER_H_
 #define SINGA_DRIVER_H_
 
+#include <vector>
 #include "proto/job.pb.h"
 #include "proto/singa.pb.h"
 #include "utils/factory.h"
@@ -29,20 +30,70 @@
 #include "utils/singleton.h"
 #include "utils/updater.h"
 #include "neuralnet/layer.h"
-#include "trainer/worker.h"
+#include "./worker.h"
+#include "./server.h"
 
 namespace singa {
-
+using std::vector;
 class Driver {
  public:
   /**
-   * Init SINGA, including init glog, parse job id and job conf from cmd line,
-   * and register built-in layer, worker, updater, param subclasses.
+   * Init SINGA
+   * - init glog
+   * - parse job id and job conf from cmd line
+   * - register built-in layer, worker, updater, param subclasses.
    *
    * May be used for MPI init if it is used for message passing.
    */
   void Init(int argc, char** argv);
   /**
+   * Update job configuration and call Train(const JobProto&) to start the
+   * training.
+   *
+   * It sets up the logging path and checkpoing files (if resume), and checks
+   * the existence of the workspace folder .
+   *
+   * @param[in] resume if true resume the training from the latest checkpoint
+   * files.
+   * @param[in] job_conf job configuration.
+   */
+  void Train(bool resume, const JobProto& job_conf);
+  /**
+   * Create workers and servers to conduct the training.
+   *
+   * @param[in] job_conf job configuration with all necessary fields set (e.g.,
+   * by Train(bool, const JobProto&).
+   */
+  void Train(const JobProto& job_conf);
+  /**
+   * Setting the checkpoint field of the job configuration to resume training.
+   *
+   * The checkpoint folder will be searched to get the files for the latest
+   * checkpoint, which will be added into the checkpoint field. The workers
+   * would then load the values of params from the checkpoint files.
+   *
+   * @param job_conf job configuration
+   */
+  void SetupForResume(JobProto* job_conf);
+  /**
+   * Create server instances.
+   *
+   * @param[in] job_conf job configuration.
+   * @param[in] net training neural network.
+   * @return server instances
+   */
+  const vector<Server*> CreateServers(const JobProto& job_conf, NeuralNet* net);
+  /**
+   * Create workers instances.
+   * @param[in] job_conf job configuration.
+   * @param[in] net training neural network.
+   * @return worker instances
+   */
+  const vector<Worker*> CreateWorkers(const JobProto& job_conf, NeuralNet* net);
+
+
+  /*********** Subclasses registers *************************/
+  /**
    * Register a Layer subclass.
    *
    * @param type layer type ID. If called to register built-in subclasses,
@@ -103,12 +154,7 @@ class Driver {
   template<typename Subclass, typename Type>
   int RegisterParamGenerator(const Type& type);
 
-  /**
-   * Submit the job configuration for starting the job.
-   * @param resume resume from last checkpoint if true.
-   * @param job job configuration
-   */
-  void Submit(bool resume, const JobProto& job);
+  /****************** Access function ********************/
   /**
    * @return job ID which is generated by zookeeper and passed in by the
    * launching script.

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/neuralnet/connection_layer.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/connection_layer.h b/include/neuralnet/connection_layer.h
index 75f399c..1976fb9 100644
--- a/include/neuralnet/connection_layer.h
+++ b/include/neuralnet/connection_layer.h
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -56,12 +56,12 @@ class BridgeLayer : virtual public ConnectionLayer {
  */
 class BridgeDstLayer : public BridgeLayer {
  public:
-  void Setup(const LayerProto& proto, int npartitions) override;
-  void ComputeFeature(int flag, Metric* perf) override {
+  void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override {
     // reset ready_ for next iteration.
     ready_ = false;
   }
-  void ComputeGradient(int flag, Metric* perf) override {}
+  void ComputeGradient(int flag,  const vector<Layer*>& srclayers) override {}
   bool is_bridgedstlayer() const {
     return true;
   }
@@ -73,25 +73,32 @@ class BridgeDstLayer : public BridgeLayer {
  */
 class BridgeSrcLayer : public BridgeLayer {
  public:
-  void ComputeFeature(int flag, Metric* perf) override {}
-  void ComputeGradient(int flag, Metric* perf) override {
+  void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override {
+    CHECK_GE(srclayers.size(), 1);
+    srclayer_ = srclayers.at(0);
+  }
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override {}
+  void ComputeGradient(int flag,  const vector<Layer*>& srclayers) override {
     ready_ = false;
   }
   const Blob<float>& data(const Layer* from) const override {
-    return srclayers_[0]->data(this);
+    return srclayer_->data(this);
   }
   Blob<float>* mutable_data(const Layer* from) override {
-    return srclayers_[0]->mutable_data(this);
+    return srclayer_->mutable_data(this);
   }
   const Blob<float>& grad(const Layer* from) const override {
-    return srclayers_[0]->grad(this);
+    return srclayer_->grad(this);
   }
   Blob<float>* mutable_grad(const Layer* from) override {
-    return srclayers_[0]->mutable_grad(this);
+    return srclayer_->mutable_grad(this);
   }
   bool is_bridgesrclayer() const override {
     return true;
   }
+
+ private:
+  Layer* srclayer_;
 };
 
 
@@ -103,9 +110,9 @@ class BridgeSrcLayer : public BridgeLayer {
  */
 class ConcateLayer : public ConnectionLayer {
  public:
-  void Setup(const LayerProto& proto, int npartitions) override;
-  void ComputeFeature(int flag, Metric* perf) override;
-  void ComputeGradient(int flag, Metric* perf) override;
+  void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override;
 };
 
 /**
@@ -116,9 +123,9 @@ class ConcateLayer : public ConnectionLayer {
  */
 class SliceLayer : public ConnectionLayer {
  public:
-  void Setup(const LayerProto& proto, int npartitions) override;
-  void ComputeFeature(int flag, Metric *perf) override;
-  void ComputeGradient(int flag, Metric* perf) override;
+  void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override;
 
  private:
   std::vector<Blob<float>> datavec_;
@@ -136,9 +143,9 @@ class SliceLayer : public ConnectionLayer {
  */
 class SplitLayer : public ConnectionLayer {
  public:
-  void Setup(const LayerProto& proto, int npartitions) override;
-  void ComputeFeature(int flag, Metric* perf) override;
-  void ComputeGradient(int flag, Metric* perf) override;
+  void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override;
 
  protected:
   Blob<float> grads_;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/neuralnet/input_layer.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/input_layer.h b/include/neuralnet/input_layer.h
index 709912d..b5f2dd4 100644
--- a/include/neuralnet/input_layer.h
+++ b/include/neuralnet/input_layer.h
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,8 +32,8 @@
  *
  * The feature loading phase can be implemented using a single layer or
  * separated into DataLayer (for loading features as records) and ParserLayer
- * (for parsing features from records). SINGA has provided some built-in layers
- * for DataLayer and ParserLayer.
+ * (for parsing features from records). SINGA has provided some subclasses of
+ * DataLayer and ParserLayer.
  *
  * Data prefetching can be implemented as a sub-class of InputLayer.
  * SINGA provides a built-in PrefetchLayer which embeds DataLayer and
@@ -41,20 +41,15 @@
  */
 namespace singa {
 /**
- * Base layer for reading records from local Shard, HDFS, lmdb, etc.
+ * Base layer for reading ::Record  from local Shard, HDFS, lmdb, etc.
  */
 class DataLayer: virtual public InputLayer {
  public:
-  void ComputeGradient(int flag, Metric* perf) override {}
-  Blob<float>* mutable_data(const Layer* layer) override {
-    return nullptr;
-  }
-  Blob<float>* mutable_grad(const Layer* layer) override {
-    return nullptr;
-  }
+  Blob<float>* mutable_data(const Layer* layer) override { return nullptr; }
   ConnectionType dst_layer_connection() const override {
     return kOneToMany;
   }
+
   inline int batchsize() const { return batchsize_; }
   virtual const Record& sample() const {
     return sample_;
@@ -81,8 +76,8 @@ class ShardDataLayer : public DataLayer {
  public:
   ~ShardDataLayer();
 
-  void Setup(const LayerProto& proto, int npartitions) override;
-  void ComputeFeature(int flag, Metric *perf) override;
+  void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
 
  private:
   DataShard* shard_;
@@ -94,9 +89,9 @@ class LMDBDataLayer : public DataLayer {
  public:
   ~LMDBDataLayer();
 
-  void Setup(const LayerProto& proto, int npartitions) override;
+  void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
   void OpenLMDB(const std::string& path);
-  void ComputeFeature(int flag, Metric *perf) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
   void ConvertCaffeDatumToRecord(const CaffeDatum& datum,
                                  SingleLabelImageRecord* record);
 
@@ -114,8 +109,8 @@ class LMDBDataLayer : public DataLayer {
  */
 class ParserLayer : public InputLayer {
  public:
-  void ComputeFeature(int flag, Metric* perf) override;
-  void ComputeGradient(int flag, Metric* perf) override {}
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {}
   ConnectionType dst_layer_connection() const override {
     return kOneToMany;
   }
@@ -124,13 +119,6 @@ class ParserLayer : public InputLayer {
    */
   virtual void ParseRecords(int flag, const std::vector<Record>& records,
       Blob<float>* blob) = 0;
-  Blob<float>* mutable_grad(const Layer* layer) override {
-    return nullptr;
-  }
-  const Blob<float>& grad(const Layer* from) const  override {
-    CHECK(false) << "Parser layer has not gradient blob";
-    return grad_;
-  }
 };
 
 /**
@@ -138,7 +126,7 @@ class ParserLayer : public InputLayer {
  */
 class LabelLayer : public ParserLayer {
  public:
-  void Setup(const LayerProto& proto, int npartitions) override;
+  void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
   void ParseRecords(int flag, const std::vector<Record>& records,
                     Blob<float>* blob) override;
 };
@@ -148,7 +136,7 @@ class LabelLayer : public ParserLayer {
  */
 class MnistLayer : public ParserLayer {
  public:
-  void Setup(const LayerProto& proto, int npartitions) override;
+  void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
   void ParseRecords(int flag, const std::vector<Record>& records,
                     Blob<float>* blob) override;
 
@@ -161,7 +149,7 @@ class MnistLayer : public ParserLayer {
  */
 class RGBImageLayer : public ParserLayer {
  public:
-  void Setup(const LayerProto& proto, int npartitions) override;
+  void Setup(const LayerProto& proto, const vector<Layer*>& srclayers) override;
   void ParseRecords(int flag, const std::vector<Record>& records,
                     Blob<float>* blob) override;
 
@@ -181,8 +169,8 @@ class RGBImageLayer : public ParserLayer {
 class PrefetchLayer : public Layer {
  public:
   ~PrefetchLayer();
-  void ComputeFeature(int flag, Metric* perf) override;
-  void ComputeGradient(int flag, Metric* perf) override {}
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {}
 
  protected:
   std::thread thread_;

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/neuralnet/layer.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/layer.h b/include/neuralnet/layer.h
index 05377b1..bf83163 100644
--- a/include/neuralnet/layer.h
+++ b/include/neuralnet/layer.h
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,16 +33,22 @@
 #include "utils/param.h"
 
 namespace singa {
-
+using std::vector;
 /**
  * Base layer class.
  *
- * Children should implement at least
+ * Subclasses should implement at least
  * Layer::ComputeFeature() and Layer::ComputGradient()
- * functions for contrastive-divergence/back-propagation algorithm.
+ * functions in accordance with the NeuralNet::TrainOneBatch function.
  */
 class Layer {
  public:
+  /**
+   * Create a sub-layer instance based on proto.type();
+   *
+   * @param proto configuration of the layer instance.
+   * @return pointer to the newly created layer instance.
+   */
   static Layer* Create(const LayerProto& proto);
 
   Layer() {}
@@ -50,49 +56,51 @@ class Layer {
   /**
    * Setup layer properties.
    *
-   * Setup the shapes for data and parameters, also setup some properties
-   * based on the layer configuration and connected layers.
+   * Setup members e.g., shapes of Param objects based on the layer
+   * configuration and connected layers.
+   * It should check the partition setting when setup the properties.
    *
-   * @param proto layer configuration.
-   * @param npartitions num of total partitions of the original layer. This
-   * layer should be setup as one partition.
+   * @param conf layer configuration.
+   * @param srclayers source layers that connect to this layer.
    */
-  virtual void Setup(const LayerProto& proto, int npartitions = 1) {
-    CHECK_GE(npartitions, 1);
-    layer_proto_ = proto;
+  virtual void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) {
+    layer_conf_ = conf;
   }
   /**
    * Compute features of this layer based on connected layers.
    *
-   * @param perf pointer to Metric obj for collect and aggregate performance
+   * @param[in] flag set by the TrainOneBatch function, e.g., to indicate the
+   * running phase (kForward|kTrain, kForward|kTest, etc).
+   * @param[in] srclayers source layers that connect to this layer.
    */
-  virtual void ComputeFeature(int flag, Metric* perf) = 0;
+  virtual void ComputeFeature(int flag, const vector<Layer*>& srclayers) = 0;
   /**
-   * Compute gradients for parameters and connected layers.
-   * @param flag used to get the calling phase, e.g., forward of training
-   * (kForward | kTrain)
-   * @param flag used to get the calling phase, e.g., forward of training
+   * Compute gradients for parameters associated with this layer.
+   * It may also compute the gradients of the loss w.r.t the source layers.
+   *
+   * \copydetails ComputeFeature().
    */
-  virtual void ComputeGradient(int flag, Metric* perf) = 0;
+  virtual void ComputeGradient(int flag, const vector<Layer*>& srclayers) = 0;
   /**
-   * Layers that have paramters must override this function.
-   * @param flag used to get the calling phase, e.g., forward of training
-   * (kForward | kTrain)
-   * @return parameters associated with this layer
+   * Layers that have paramters must override this function to return all Param
+   * objects associated with this layer.
+   *
+   * @return parameters associated with this layer.
    */
   virtual const std::vector<Param*> GetParams() const {
     return std::vector<Param*> {};
   }
   /**
-   * Return the connection type between one neuron of this layer and
-   * its source layer.
+   * Return the connection type between one neuron of this layer and its source
+   * layer.
+   *
    * Currently support two connection types: kOneToOne, and kOneToAll.
-   * kOneToOne indicates the neuron depends on only one neuron from src layer.
-   * kOneToAll indicates the neuron depends on all neurons from src layer.
+   * - kOneToOne indicates the neuron depends on only one neuron from src layer.
+   * - kOneToAll indicates the neuron depends on all neurons from src layer.
    * TODO(wangwei) support kOneToMany.
    *
-   * @param k index of source layer (current only support k = 0.
-   * @param connection type.
+   * @param[in] k index of source layer, current only support k = 0.
+   * @return connection type.
    */
   virtual ConnectionType src_neuron_connection(int k) const {
     // CHECK_LT(k, srclayers_.size());
@@ -102,89 +110,101 @@ class Layer {
    * Return the connection type of this layer and all dst layers.
    *
    * Currently support two connection types: kOneToOne, and kOneToMany.
-   * kOneToOne indicates the users implement the ComputeFeature and
-   * ComputeGradient function considering only one dest layer. In this case,
+   * - kOneToOne indicates the users implement the ComputeFeature and
+   * ComputeGradient function considering only one dst layer. In this case,
    * a SplitLayer will be added automatically to connect this layer with all
    * dest layer.
-   * kOneToMany indicates the users has already considered multiple dest layers
-   * in the implementation.
+   * - kOneToMany indicates this layer has already considered multiple dst
+   *   layers in the implementation.
+   *
    * @return connection type default is kOneToOne.
    */
   virtual ConnectionType dst_layer_connection() const {
     return kOneToOne;
   }
   /**
-   * For print debug info about each layer, e.g., norm of feature vector,
-   * norm of parameters.
+   * To display layer info, e.g., aggreated loss/accuracy, or norm of feature
+   * vector and norm of parameters.
    *
-   * @param step training/test/validation step
-   * @param flag used to get the calling phase, e.g., forward of training
-   * (kForward | kTrain)
-   * @return debug info about this layer.
+   * @param[in] debug whether print the debug info
+   * @param[in] flag used to get the calling phase, e.g., forward of training
+   * (kForward | kTrain).
+   * @return info string about this layer, which is printed into the log.
    */
-  virtual const std::string DebugString(int step, int flag);
+  virtual const std::string ToString(bool debug, int flag);
   /**
-   * @return partition dimension of this layer.
-   * -1 for no partition;
-   *  0 for partition the mini-batch into sub-mini-batch.
-   *  1 for partition the layer feature vector into sub-vector.
+   * @return partition dimension of this layer,
+   * - -1 for no partition.
+   * -  0 for partition on the data dimension, i.e., partitioning the mini-batch
+   *    into sub-mini-batches.
+   * -  1 for partition this layer on feature dimension, i.e., the feature
+   *    vector of each instance is partitioned into sub-vectors.
    */
   inline int partition_dim() const {
-    CHECK_LE(layer_proto_.partition_dim(), 1);
-    return layer_proto_.partition_dim();
+    CHECK_LE(layer_conf_.partition_dim(), 1);
+    return layer_conf_.partition_dim();
   }
-  inline int partition_id() const { return layer_proto_.partition_id(); }
-  inline int type() const { return layer_proto_.type(); }
   /**
-   * Return name of this layer
+   * @return the partition ID (i.e., the worker ID to whom is layer is
+   * dispatched) of this layer, which is a sublayer partitioned from the
+   * original layer.
+   */
+  inline int partition_id() const { return layer_conf_.partition_id(); }
+  /**
+   * @return total number of partitions (i.e., sub-layers) of the original
+   * layer of this layer.
+   */
+  inline int num_partitions() const { return layer_conf_.num_partitions(); }
+  /**
+   * @return the type of this layer, only valid for built-in layer (types).
    */
-  inline const std::string &name() const { return layer_proto_.name(); }
+  inline LayerType type() const { return layer_conf_.type(); }
   /**
-   * @return name of src data blob, used by prefetch layer to locate the data
-   * blob in parser layers; The default value is "unknown"; If the
-   * src layer is the prefetch layer and there are more than one parser layers,
-   * this value be set.
-  const std::string &datablob() const {
-    return layer_proto_.datablob();
+   * @return user-defined layer type.
+   */
+  inline const std::string& user_type() const {
+    return layer_conf_.user_type();
   }
+  /**
+   * Return name of this layer
    */
+  inline const std::string& name() const { return layer_conf_.name(); }
   /**
-   * @return a const ref for Blob storing neuron values of this layer for BP
+   * @param[in] from pointer to one of the dst layer. For some layers, they have
+   * more than one data Blob. In this case, this argument identifies the layer
+   * that is requesting the data Blob.
+   * @return a const ref for Blob storing feature values of this layer.
    */
   virtual const Blob<float>& data(const Layer* from) const {
     return data_;
   }
+  /**
+   * @see data().
+   * @return the pointer to the Blob storing feature values of this layer.
+   */
   virtual Blob<float>* mutable_data(const Layer* from) {
     return &data_;
   }
+  /**
+   * @see data().
+   * @return the const ref of the Blob for the gradient of this layer, mainly
+   * used in BP algorithm.
+   */
   virtual const Blob<float>& grad(const Layer* from) const {
     return grad_;
   }
   /**
-   * @return a pointer to storing neuron grads of this layer for BP
+   * @see data().
+   * @return a pointer to the Blob storing gradients of this layer, mainly
+   * used in BP algorithm.
    */
   virtual Blob<float>* mutable_grad(const Layer* from) {
     return &grad_;
   }
-  /**
-   * return LayerS that connected to this layer
-   */
-  inline const std::vector<Layer*> srclayers() const { return srclayers_; }
-  /**
-   * return LayerS that this layer connected to
-   */
-  inline const std::vector<Layer*> dstlayers() const { return dstlayers_; }
-  inline int srclayers_size() const { return srclayers_.size(); }
-  inline int dstlayers_size() const { return dstlayers_.size(); }
-  inline void clear_dstlayers() { dstlayers_.clear(); }
-  inline void clear_srclayers() { srclayers_.clear(); }
-  inline void add_srclayer(Layer* src) { srclayers_.push_back(src); }
-  inline void add_dstlayer(Layer* dst) { dstlayers_.push_back(dst); }
 
  protected:
-  LayerProto layer_proto_;
+  LayerProto layer_conf_;
   Blob<float> data_, grad_;
-  std::vector<Layer*> srclayers_, dstlayers_;
 };
 
 /**
@@ -199,29 +219,59 @@ class ConnectionLayer : virtual public Layer {
  * parsing records.
  */
 class InputLayer : virtual public Layer {
-  // defined as a layer category
+ public:
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {}
+  Blob<float>* mutable_grad(const Layer* layer) override {
+    // LOG(FATAL) << "Loss layer has no gradient blob";
+    return nullptr;
+  }
+  const Blob<float>& grad(const Layer* from) const override {
+    // LOG(FATAL) << "Loss layer has no gradient blob";
+    return grad_;
+  }
 };
 
 
+/**
+ * Base layer for calculating loss and doing BackPropagation.
+ */
+class LossLayer : virtual public Layer {
+ public:
+  const std::string ToString(bool debug, int flag) override;
+  Blob<float>* mutable_grad(const Layer* layer) override {
+    LOG(FATAL) << "Loss layer has no gradient blob";
+    return nullptr;
+  }
+  const Blob<float>& grad(const Layer* from) const override {
+    LOG(FATAL) << "Loss layer has no gradient blob";
+    return grad_;
+  }
+ protected:
+  Metric metric_;
+};
+
+/**
+ * Base layer for feature transformation, e.g., ConvolutionLayer, PoolingLayer,
+ * etc.
+ */
 class NeuronLayer : virtual public Layer {
   // defined as a layer category
 };
 
 /**
- * Base layer for calculating loss and other metrics, e.g., precison.
+ * Base layer for collecting features into disk file, HTTP stream, etc.
  */
-class LossLayer : virtual public Layer {
+class OutpuLayer : virtual public Layer {
  public:
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {}
   Blob<float>* mutable_grad(const Layer* layer) override {
+    LOG(FATAL) << "Loss layer has no gradient blob";
     return nullptr;
   }
   const Blob<float>& grad(const Layer* from) const override {
     LOG(FATAL) << "Loss layer has no gradient blob";
     return grad_;
   }
-
- protected:
-  Blob<float> metric_;
 };
 
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/neuralnet/loss_layer.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/loss_layer.h b/include/neuralnet/loss_layer.h
index 3af0b46..a48a8e7 100644
--- a/include/neuralnet/loss_layer.h
+++ b/include/neuralnet/loss_layer.h
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,30 +22,36 @@
 #ifndef SINGA_NEURALNET_LOSS_LAYER_H_
 #define SINGA_NEURALNET_LOSS_LAYER_H_
 
+#include <vector>
 #include "neuralnet/layer.h"
 
 /**
- * \file this file includes the declarations of layers that inherit the base
+ * @file this file includes the declarations of layers that inherit the base
  * LossLayer for measuring the objective training loss.
  */
 namespace singa {
+using std::vector;
 /**
- * Squared Euclidean loss as 0.5 ||predict - ground_truth||^2.
+ * Squared Euclidean loss as @f$0.5 ||p - t||^2@f$, where p is for prediction
+ * t is for ground truth.
  */
 class EuclideanLossLayer : public LossLayer {
  public:
-  void ComputeFeature(int flag, Metric* perf) override;
-  void ComputeGradient(int flag, Metric* perf) override;
+  void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override;
 };
 
 /**
- * Cross-entropy loss applied to the probabilities after Softmax.
+ * Cross-entropy loss applied to the probabilities computed from Softmax.
+ * @f$ L_i = -log P_{t_i}, t_i\in [0, C] @f$ is the label for the i-th object,
+ * C is the total number of classes.
  */
 class SoftmaxLossLayer : public LossLayer {
  public:
-  void Setup(const LayerProto& proto, int npartitions) override;
-  void ComputeFeature(int flag, Metric* perf) override;
-  void ComputeGradient(int flag, Metric* perf) override;
+  void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override;
 
   /**
    * softmax is not recommendeded for partition because it requires the whole

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/321ef96a/include/neuralnet/neuralnet.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/neuralnet.h b/include/neuralnet/neuralnet.h
index 693fe19..a202f44 100644
--- a/include/neuralnet/neuralnet.h
+++ b/include/neuralnet/neuralnet.h
@@ -7,9 +7,9 @@
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
-* 
+*
 *   http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@
 
 #include <string>
 #include <vector>
+#include <unordered_map>
 
 #include "neuralnet/layer.h"
 #include "proto/job.pb.h"
@@ -31,7 +32,6 @@
 #include "utils/graph.h"
 
 namespace singa {
-
 /**
  * The neural network is constructed from user configurations in NetProto.
  *
@@ -60,23 +60,27 @@ class NeuralNet {
    * @param netproto neural net config
    * @param npartitions num of partitions. 1 for no partitioning.
    */
-  NeuralNet(NetProto netproto, int npartitions);
+  NeuralNet(NetProto net_conf, int num_partitions);
   ~NeuralNet();
   /**
    * To display the adjacency layers
-   */
   std::string ToAdjacency();
+   */
   /**
    * Share memory of parameter values from other neuralnet
    */
   void ShareParamsFrom(NeuralNet* other);
-  inline const std::vector<Layer*>& layers() { return layers_; }
+  inline const std::vector<Layer*>& layers() const { return layers_; }
   inline const std::vector<Param*>& params() const { return params_; }
   inline Layer* name2layer(std::string name) const {
-    if (name2layer_.find(name) != name2layer_.end())
-      return name2layer_.at(name);
-    else
-      return nullptr;
+    CHECK(name2layer_.find(name) != name2layer_.end())
+      << "No layer with name " << name;
+    return name2layer_.at(name);
+  }
+  inline const std::vector<Layer*>& srclayers(const Layer* layer) const {
+    CHECK(src_map_.find(layer) != src_map_.end())
+      << "layer (" << layer->name() << " ) has no source layers";
+    return src_map_.at(layer);
   }
   inline Param* paramid2param(int id) const { return paramid2param_.at(id); }
 
@@ -90,11 +94,11 @@ class NeuralNet {
    * @npartitions
    * @return neural net graph
    */
-  Graph* CreateGraph(const NetProto& netproto, int npartitions);
+  Graph* CreateGraph(const NetProto& netproto, int num_partitions);
   /**
    * Create neural net from graph, one layer per node.
    */
-  void CreateNetFromGraph(Graph* graph, int npartitions);
+  void CreateNetFromGraph(Graph* graph, int num_partitions);
   /**
    * prepare data structures, e.g., params_, layers_, etc.
    */
@@ -104,8 +108,9 @@ class NeuralNet {
   std::vector<Layer*> layers_;
   std::vector<Param*> params_;
 
-  std::map<std::string, Layer*> name2layer_;
-  std::map<int, Param*> paramid2param_;
+  std::unordered_map<std::string, Layer*> name2layer_;
+  std::unordered_map<int, Param*> paramid2param_;
+  std::unordered_map<const Layer*, std::vector<Layer*>> src_map_;
 };
 
 }  // namespace singa


Mime
View raw message