singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wang...@apache.org
Subject [06/12] incubator-singa git commit: Transfer code from nusinga repo to singa apache repo. New commuinication framework is implemented to unify the frameworks of existing distributed deep learning systems. Communication is now implmented using ZeroMQ. API
Date Sun, 03 May 2015 14:04:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/common.h
----------------------------------------------------------------------
diff --git a/include/utils/common.h b/include/utils/common.h
new file mode 100644
index 0000000..993c153
--- /dev/null
+++ b/include/utils/common.h
@@ -0,0 +1,51 @@
+#ifndef INCLUDE_UTILS_COMMON_H_
+#define INCLUDE_UTILS_COMMON_H_
+#pragma once
+#include <glog/logging.h>
+#include <gflags/gflags.h>
+#include <google/protobuf/message.h>
+#include <stdarg.h>
+#include <thread>         // std::this_thread::sleep_for
+#include <chrono>
+#include <string>
+#include <vector>
+#include <mutex>
+#include <queue>
+#include <sys/stat.h>
+#include <map>
+
+using std::vector;
+using std::string;
+using std::map;
+using google::protobuf::Message;
+
+#ifndef GFLAGS_GFLAGS_H_
+namespace gflags = google;
+#endif  // GFLAGS_GFLAGS_H_
+
+
+namespace singa {
+
+void ReadProtoFromTextFile(const char* filename, Message* proto) ;
+void WriteProtoToTextFile(const Message& proto, const char* filename) ;
+void ReadProtoFromBinaryFile(const char* filename, Message* proto) ;
+void WriteProtoToBinaryFile(const Message& proto, const char* filename);
+
+std::string IntVecToString(const vector<int>& vec) ;
+string StringPrintf(string fmt, ...) ;
+void Debug() ;
+inline bool check_exists(const std::string& name) {
+    struct stat buffer;
+    return (stat (name.c_str(), &buffer) == 0);
+}
+
+inline void Sleep(int millisec=1){
+  std::this_thread::sleep_for(std::chrono::milliseconds(millisec));
+}
+
+inline float rand_real(){
+  return  static_cast<float>(rand())/(RAND_MAX+1.0f);
+}
+
+} /* singa */
+#endif  // INCLUDE_UTILS_COMMON_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/data_shard.h
----------------------------------------------------------------------
diff --git a/include/utils/data_shard.h b/include/utils/data_shard.h
new file mode 100644
index 0000000..2ebade9
--- /dev/null
+++ b/include/utils/data_shard.h
@@ -0,0 +1,145 @@
+#ifndef INCLUDE_UTILS_SHARD_H_
+#define INCLUDE_UTILS_SHARD_H_
+
+#include <google/protobuf/message.h>
+#include <fstream>
+#include <string>
+#include <unordered_set>
+
+
+using google::protobuf::Message;
+
+namespace singa {
+
+/**
+ * Data shard stores training/validation/test tuples.
+ * Every worker node should have a training shard (validation/test shard
+ * is optional). The shard file for training is
+ * singa::Cluster::workspace()/train/shard.dat; The shard file for validation
+ * is singa::Cluster::workspace()/train/shard.dat; Similar path for test.
+ *
+ * shard.dat consists of a set of unordered tuples. Each tuple is
+ * encoded as [key_len key record_len val] (key_len and record_len are of type
+ * uint32, which indicate the bytes of key and record respectively.
+ *
+ * When Shard obj is created, it will remove the last key if the record size and
+ * key size do not match because the last write of tuple crashed.
+ *
+ * TODO
+ * 1. split one shard into multile shards.
+ * 2. add threading to prefetch and parse records
+ *
+ */
+class DataShard {
+ public:
+  enum {
+    //!< read only mode used in training
+    kRead=0,
+    //!< write mode used in creating shard (will overwrite previous one)
+    kCreate=1,
+    //!< append mode, e.g. used when previous creating crashes
+    kAppend=2
+  };
+
+ public:
+  /**
+   * Init the shard obj.
+   * @folder shard folder (path excluding shard.dat) on worker node
+   * @mode shard open mode, Shard::kRead, Shard::kWrite or Shard::kAppend
+   * @bufsize batch bufsize bytes data for every disk op (read or write),
+   * default is 100MB
+   */
+  DataShard(std::string folder, char mode, int capacity=104857600);
+  ~DataShard();
+
+  /**
+   * read next tuple from the shard.
+   * @key key
+   * @param val record of type Message
+   * @return true if read success otherwise false, e.g., the tuple was not
+   * inserted completely.
+   */
+  bool Next(std::string *key, Message* val);
+  /**
+   * read next tuple from the shard.
+   * @key key tuple key
+   * @param val record of type string
+   * @return true if read success otherwise false, e.g., the tuple was not
+   * inserted completely.
+   */
+  bool Next(std::string *key, std::string* val);
+
+  /**
+   * Append one tuple to the shard.
+   * @param key e.g., image path
+   * @param val
+   * @return reture if sucess, otherwise false, e.g., inserted before
+   */
+  bool Insert(const std::string& key, const Message& tuple);
+  /**
+   * Append one tuple to the shard.
+   * @param key e.g., image path
+   * @param val
+   * @return reture if sucess, otherwise false, e.g., inserted before
+   */
+  bool Insert(const std::string& key, const std::string& tuple);
+  /**
+   * Move the read pointer to the head of the shard file.
+   * Used for repeated reading.
+   */
+  void SeekToFirst();
+  /**
+   * Flush buffered data to disk.
+   * Used only for kCreate or kAppend.
+   */
+  void Flush() ;
+  /**
+   * Iterate through all tuples to get the num of all tuples.
+   * @return num of tuples
+   */
+  const int Count();
+  /**
+   * @return path to shard file
+   */
+  const std::string path(){
+    return path_;
+  }
+
+ protected:
+  /**
+   * Read the next key and prepare buffer for reading value.
+   * @param key
+   * @return length (i.e., bytes) of value field.
+   */
+  int Next(std::string *key);
+  /**
+   * Setup the disk pointer to the right position for append in case that
+   * the pervious write crashes.
+   * @param path shard path.
+   * @return offset (end pos) of the last success written record.
+   */
+  int PrepareForAppend(std::string path);
+  /**
+   * Read data from disk if the current data in the buffer is not a full field.
+   * @param size size of the next field.
+   */
+  bool PrepareNextField(int size);
+
+ private:
+  char mode_;
+  std::string path_;
+  // either ifstream or ofstream
+  std::fstream fdat_;
+  // to avoid replicated record
+  std::unordered_set<std::string> keys_;
+  // internal buffer
+  char* buf_;
+  // offset inside the buf_
+  int offset_;
+  // allocated bytes for the buf_
+  int capacity_;
+  // bytes in buf_, used in reading
+  int bufsize_;
+};
+} /* singa */
+#endif  // INCLUDE_UTILS_SHARD_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/factory.h
----------------------------------------------------------------------
diff --git a/include/utils/factory.h b/include/utils/factory.h
new file mode 100644
index 0000000..c8fef32
--- /dev/null
+++ b/include/utils/factory.h
@@ -0,0 +1,57 @@
+#ifndef INCLUDE_UTILS_FACTORY_H_
+#define INCLUDE_UTILS_FACTORY_H_
+#include <glog/logging.h>
+
+#include <functional>
+#include <utility>
+#include <map>
+/**
+ * macro that creats a function which instantiate a subclass instance and
+ * returns pointer to the base class.
+ */
+#define CreateInstance(SubClass, BaseClass) \
+  [](void)->BaseClass* {return new SubClass();}
+
+/**
+ * factory template to generate class (or a sub-class) object  based on id.
+ * 1. register class creation function that generates a class
+ * object based on id.
+ * 2. call Create() func to call the creation function and return
+ * a pointer to the base calss.
+ */
+
+template<typename T>
+class Factory{
+ //template<Factory<T>> friend class Singleton;
+ public:
+  /**
+   * Register functions to create user defined classes.
+   * This function is called by the REGISTER_FACTORY macro.
+   * @param id identifier of the creating function/class
+   * @param create_function a function that creates a layer instance
+   */
+  void Register(const std::string id, std::function<T*(void)> func);
+  /**
+   * create a layer  instance by providing its type
+   * @param type the identifier of the layer to be created
+   */
+  T *Create(const std::string id);
+
+ private:
+  //<! Map that stores the registered creation functions
+  std::map<std::string, std::function<T*(void)>> str2func_;
+};
+
+template<typename T>
+void Factory<T>::Register(const std::string id,
+                                        std::function<T*(void)> func) {
+  str2func_[id] = func;
+}
+
+template<typename T>
+T *Factory<T>::Create(const std::string id) {
+  CHECK(str2func_.find(id) != str2func_.end())
+      << "The creation function for " << id << " has not been registered";
+  return str2func_[id]();
+}
+#endif // INCLUDE_UTILS_FACTORY_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/graph.h
----------------------------------------------------------------------
diff --git a/include/utils/graph.h b/include/utils/graph.h
new file mode 100644
index 0000000..ca582b5
--- /dev/null
+++ b/include/utils/graph.h
@@ -0,0 +1,150 @@
+#ifndef INCLUDE_UTILS_GRAPH_H_
+#define INCLUDE_UTILS_GRAPH_H_
+#include <glog/logging.h>
+#include <vector>
+#include <string>
+#include <map>
+#include <stack>
+#include <memory>
+
+using std::vector;
+using std::string;
+using std::map;
+using std::pair;
+using std::shared_ptr;
+using std::make_shared;
+
+
+typedef struct _LayerInfo{
+  // origin identifies the origin of this node, i.e., the corresponding layer
+  string origin;
+  int locationid;// locationidation id;
+  int partitionid;
+  int slice_dimension;
+  int concate_dimension;
+}LayerInfo;
+typedef LayerInfo V;
+
+
+class Node;
+typedef shared_ptr<Node> SNode;
+
+class Node{
+ public:
+  typedef shared_ptr<Node> SNode;
+  Node(string name): name_(name){}
+  Node(string name, const V& v):
+    name_(name), val_(v){}
+
+  void AddDstNode(SNode dstnode){
+    dstnodes_.push_back(dstnode);
+  }
+  void AddSrcNode(SNode srcnode){
+    srcnodes_.push_back(srcnode);
+  }
+
+  void RemoveDstNode(SNode dst){
+    auto iter=dstnodes_.begin();
+    while((*iter)->name_!=dst->name_&&iter!=dstnodes_.end()) iter++;
+    CHECK((*iter)->name_==dst->name_);
+    dstnodes_.erase(iter);
+  }
+  void RemoveSrcNode(SNode src){
+    auto iter=srcnodes_.begin();
+    while((*iter)->name_!=src->name_&&iter!=srcnodes_.end()) iter++;
+    CHECK((*iter)->name_==src->name_);
+    srcnodes_.erase(iter);
+  }
+  const string& name() const {return name_;}
+  const V& val() const {return val_;}
+  const SNode srcnodes(int k) const {return srcnodes_[k]; }
+  const SNode dstnodes(int k) const {return dstnodes_[k]; }
+  const vector<SNode>& srcnodes() const {return srcnodes_; }
+  const vector<SNode>& dstnodes() const {return dstnodes_; }
+  int  dstnodes_size() const {return dstnodes_.size(); }
+  int  srcnodes_size() const {return srcnodes_.size(); }
+
+ private:
+  string name_;
+  vector<SNode> srcnodes_;
+  vector<SNode> dstnodes_;
+
+  V val_;
+    // properties
+  string color_, weight_, shape_;
+};
+
+
+/**
+ * For partition neuralnet and displaying the neuralnet structure
+ */
+class Graph{
+ public:
+  Graph(){}
+  void Sort();
+  const SNode& AddNode(string name, V origin){
+    nodes_.push_back(make_shared<Node>(name, origin));
+    name2node_[name]=nodes_.back();
+    return nodes_.back();
+  }
+  const SNode& AddNode(string name){
+    nodes_.push_back(make_shared<Node>(name));
+    name2node_[name]=nodes_.back();
+    return nodes_.back();
+  }
+
+  void AddEdge(SNode srcnode, SNode dstnode){
+    srcnode->AddDstNode(dstnode);
+    dstnode->AddSrcNode(srcnode);
+  }
+
+  void AddEdge(const string& src, const string& dst){
+    CHECK(name2node_.find(src)!=name2node_.end())<<"can't find src node "<<src;
+    CHECK(name2node_.find(dst)!=name2node_.end())<<"can't find dst node "<<dst;
+
+    SNode srcnode=name2node_[src], dstnode=name2node_[dst];
+    AddEdge(srcnode, dstnode);
+  }
+
+  void RemoveEdge(const string &src, const string& dst){
+    CHECK(name2node_.find(src)!=name2node_.end())<<"can't find src node "<<src;
+    CHECK(name2node_.find(dst)!=name2node_.end())<<"can't find dst node "<<dst;
+
+    SNode srcnode=name2node_[src], dstnode=name2node_[dst];
+    RemoveEdge(srcnode, dstnode);
+  }
+
+  void RemoveEdge(SNode src, SNode dst){
+    src->RemoveDstNode(dst);
+    dst->RemoveSrcNode(src);
+  }
+
+  const vector<SNode>& nodes() const{
+    return nodes_;
+  };
+
+  const SNode& node(string name) const{
+    CHECK(name2node_.find(name)!= name2node_.end())
+      <<"can't find dst node "<<name;
+    return name2node_.at(name);
+  }
+
+  const string ToString() const;
+  const string ToString(const map<string, string>& info) const ;
+
+  bool Check() const;
+
+  SNode InsertSliceNode(SNode srcnode, const vector<SNode>& dstnodes,
+      const V& info, bool connect_dst=true);
+  SNode InsertConcateNode(const vector<SNode>&srcnodes, SNode dstnode,
+      const V& info);
+  SNode InsertSplitNode(SNode srcnode, const vector<SNode>& dstnodes);
+  std::pair<SNode, SNode> InsertBridgeNode(SNode srcnode, SNode dstnode);
+  void topology_sort_inner(SNode node, map<string, bool> *visited,
+    std::stack<string> *stack);
+
+ private:
+  vector<SNode> nodes_;
+  map<string, SNode> name2node_;
+};
+#endif // INCLUDE_UTILS_GRAPH_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/param.h
----------------------------------------------------------------------
diff --git a/include/utils/param.h b/include/utils/param.h
new file mode 100644
index 0000000..907ef8c
--- /dev/null
+++ b/include/utils/param.h
@@ -0,0 +1,172 @@
+#ifndef INCLUDE_UTILS_PARAM_H_
+#define INCLUDE_UTILS_PARAM_H_
+#include <vector>
+#include <string>
+#include <map>
+#include <functional>
+#include "proto/model.pb.h"
+#include "utils/blob.h"
+#include "communication/msg.h"
+// Base paramter class.
+namespace singa {
+class Param {
+ public:
+  Param();
+  virtual ~Param();
+
+  virtual Msg* GenGetMsg(void* arg=nullptr);
+  virtual Msg* GenPutMsg(void* arg=nullptr);
+  virtual Msg* GenUpdateMsg(void* arg=nullptr);
+  virtual Msg* GenSyncMsg(void* arg=nullptr);
+
+  virtual Msg* HandleGetMsg(Msg** msg);
+  virtual Msg* HandlePutMsg(Msg** msg);
+  virtual int ParseUpdateMsg(Msg** msg);
+  virtual Msg* GenUpdateResponseMsg(void* arg=nullptr);
+  virtual Msg* HandleSyncMsg(Msg** msg);
+
+  virtual int ParseGetResponseMsg(Msg** msg);
+  virtual int ParsePutResponseMsg(Msg** msg);
+  virtual int ParseUpdateResponseMsg(Msg** msg);
+  virtual int ParseSyncResponseMsg(Msg** msg);
+
+  /**
+   * setup param shape
+   */
+  virtual void Setup(const ParamProto& proto, const std::vector<int>& shape, int fan_in);
+  /*
+   * fill the data according to initmethod, i.e., random/gaussian/fixed value
+   */
+  virtual void Init(int v=0);
+  void ShareData(shared_ptr<Param> other){
+    owner_=other->id();
+    CHECK(std::equal(data_.shape().begin(), data_.shape().end(),
+          other->data_.shape().begin()));
+    data_.ShareData(other->data_);
+  }
+  float learning_rate_multiplier() {
+    return proto_.learning_rate_multiplier();
+  }
+  float weight_decay_multiplier() {
+    return proto_.weight_decay_multiplier();
+  }
+  /*
+  const int split_threshold(){
+    return proto_.split_threshold();
+  }
+  */
+  /**
+   * if the Param shares data with others, then point to the owner.
+   * otherwise points to itself.
+   */
+  const int owner() const{
+    return owner_;
+  }
+  const std::string& name() {
+    return proto_.name();
+  }
+
+  int id() const{
+    return proto_.id();
+  }
+  void set_id(int id){
+    proto_.set_id(id);
+  }
+
+  int version() const {
+    return proto_.version(); // TODO store version in data blob
+  }
+  void set_version(int v) {
+    proto_.set_version(v); // TODO read version from data blob
+  }
+   /**
+    * @return num of floats.
+    */
+  int size() const {
+    return data_.count();
+  }
+  /**
+   * Return const mem address for the content of this parameter
+   */
+  const Blob<float> &data() {
+    return data_;
+  }
+  Blob<float> *mutable_data() {
+    return &data_;
+  }
+  /**
+   * Return gradient of this parameter
+   */
+  const Blob<float> &grad() {
+    return grad_;
+  }
+  Blob<float> *mutable_grad() {
+    return &grad_;
+  }
+
+  const Blob<float> &history() {
+    return history_;
+  }
+  Blob<float> *mutable_history() {
+    return &history_;
+  }
+
+  float* mutable_cpu_data(){
+    return data_.mutable_cpu_data();
+  }
+  float* mutable_cpu_grad(){
+    return grad_.mutable_cpu_data();
+  }
+  float* mutable_cpu_history(){
+    return history_.mutable_cpu_data();
+  }
+ protected:
+  /**
+   * name of the parameter used to share wights between neuralnets
+   */
+  std::string name_;
+  //! content, gradient, history gradient of this parameter
+  Blob<float> data_, grad_, history_;
+  int owner_;
+
+  ParamProto proto_;
+  int fan_in_;
+};
+/**
+ * Sync with server by randomly sampling some parameters for every sync.
+class RandomSyncParam: public Param{
+ public:
+  virtual zmsg_t* HandleSyncMsg(zmsg_t** msg);
+  virtual zmsg_t *GenSyncMsgFromWorker(float sample_ratio);
+  virtual void ParseSyncMsgFromPS(zmsg_t** msg);
+  virtual void Setup(const ParamProto& proto, const vector<int>& shape, int fan_in);
+  virtual void Init();
+
+  float* mutable_cpu_snapshot(){
+    return snapshot_.mutable_cpu_data();
+  }
+  const float* cpu_snapshot(){
+    return snapshot_.cpu_data();
+  }
+
+ protected:
+  const vector<int> RandomSample(int seed, int m, int n);
+
+
+  Blob<float> snapshot_;
+};
+ */
+/**
+ * Sync with server by elastic SGD see http://arxiv.org/abs/1412.6651.
+class ElasticParam: public Param{
+ public:
+  virtual zmsg_t* HandleSyncMsg(zmsg_t** msg);
+  virtual zmsg_t *GenSyncMsgFromWorker(float moving_rate);
+  virtual void ParseSyncMsgFromPS(zmsg_t** msg);
+};
+ */
+
+
+}  // namespace singa
+
+#endif  // INCLUDE_UTILS_PARAM_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/singleton.h
----------------------------------------------------------------------
diff --git a/include/utils/singleton.h b/include/utils/singleton.h
new file mode 100644
index 0000000..2e2bdfb
--- /dev/null
+++ b/include/utils/singleton.h
@@ -0,0 +1,41 @@
+#ifndef INCLUDE_UTILS_SINGLETON_H_
+#define INCLUDE_UTILS_SINGLETON_H_
+
+template<typename T>
+class Singleton {
+ public:
+  static T* Instance() {
+    if (data_==nullptr) {
+      data_ = new T();
+    }
+    return data_;
+  }
+ private:
+  static T* data_;
+};
+
+template<typename T> T* Singleton<T>::data_ = nullptr;
+
+
+/**
+ * Singleton initiated with argument
+ */
+template<typename T, typename X=int>
+class ASingleton {
+ public:
+  static T* Instance(){
+    return data_;
+  }
+  static T* Instance(X x) {
+    if (data_==nullptr) {
+      data_ = new T(x);
+    }
+    return data_;
+  }
+ private:
+  static T* data_;
+};
+
+template<typename T, typename X> T* ASingleton<T,X>::data_ = nullptr;
+
+#endif // INCLUDE_UTILS_SINGLETON_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/updater.h
----------------------------------------------------------------------
diff --git a/include/utils/updater.h b/include/utils/updater.h
new file mode 100644
index 0000000..2a6dd43
--- /dev/null
+++ b/include/utils/updater.h
@@ -0,0 +1,78 @@
+#ifndef INCLUDE_UTILS_UPDATER_H_
+#define INCLUDE_UTILS_UPDATER_H_
+#include "proto/model.pb.h"
+#include "utils/param.h"
+
+namespace singa{
+/**
+ * Updater for Param.
+ */
+class Updater{
+ public:
+  virtual void Init(const UpdaterProto &proto){
+    proto_=proto;
+  }
+  virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f)=0;
+
+  float GetLearningRate(int step);
+ protected:
+  UpdaterProto proto_;
+};
+class SGDUpdater : public Updater{
+ public:
+  virtual void Init(const UpdaterProto& proto);
+  virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f);
+
+ protected:
+  float base_lr_;
+  float momentum_;
+  float weight_decay_;
+};
+class NesterovUpdater : public Updater{
+ public:
+  virtual void Init(const UpdaterProto& proto);
+  virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f);
+
+ protected:
+  float base_lr_;
+  float momentum_;
+  float weight_decay_;
+};
+class AdaGradUpdater : public Updater{
+ public:
+  virtual void Init(const UpdaterProto& proto);
+  virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f);
+
+ protected:
+  float base_lr_;
+  float delta_;
+  float weight_decay_;
+};
+
+class RMSPropUpdater : public Updater{
+ public:
+  virtual void Init(const UpdaterProto& proto);
+  virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f);
+
+ protected:
+  float base_lr_;
+  float delta_;
+  float rho_;
+  float weight_decay_;
+};
+
+/*
+class AdaDeltaUpdater : public Updater{
+ public:
+  virtual void Init(const UpdaterProto& proto);
+  virtual void Update(int step, shared_ptr<Param> param, float grad_scale=1.0f);
+
+ protected:
+  float rho_;
+  float delta_;
+  float weight_decay_;
+};
+*/
+}
+
+#endif // INCLUDE_UTILS_UPDATER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/script/node.sh
----------------------------------------------------------------------
diff --git a/script/node.sh b/script/node.sh
new file mode 100755
index 0000000..74e0d8a
--- /dev/null
+++ b/script/node.sh
@@ -0,0 +1,71 @@
+#!/bin/bash
+if [[ $# < 2 || ! -f $2 ]]
+then
+  echo "Usage: process/folder management"
+  echo "[cat, create, delete, kill, ls, ps, reset, scp, ssh] hostfile [args]"
+  echo "   cat hostfile file--- cat the file on every node in hostfile"
+  echo "   create hostfile folder--- create the folder on every node in hostfile"
+  echo "   delete hostfile folder--- delete the folder on every node in hostfile"
+  echo "   kill hostfile job_name---  kill the job on every node in hostfile"
+  echo "   ls hostfile folder--- list the folder on every node in hostfile"
+  echo "   ps hostfile job_name---  ps aux|grep job_name on every node in hostfile"
+  echo "   reset hostfile folder--- delete and create the folder on every node in hostfile"
+  echo "   scp hostfile local_dir [remote_dir]--- copy the local_dir to remote_dir on every node in hostfile, if remote_dir is omitted, remote_dir=local_dir"
+  echo "   ssh hostfile--- test whether the nodes in hostfile are alive"
+  echo "each line in hostfile is a node name followed by a space and other fields"
+  exit
+fi
+
+ssh_options="-oStrictHostKeyChecking=no \
+-oUserKnownHostsFile=/dev/null \
+-oLogLevel=quiet"
+
+hosts=(`cat $2 |cut -d ' ' -f 1`)
+
+for i in ${hosts[@]}
+do
+  if [ $1 == "cat" ]
+  then
+    cmd="cat $3"
+  elif [ $1 == "create" -o $1 == "reset" ]
+  then
+    cmd="mkdir -p $3"
+  elif [ $1 == "delete" -o $1 == "reset" ]
+  then
+    cmd="rm -rf $3"
+  elif [ $1 == "kill" ]
+  then
+    cmd="ps ax|pgrep $3 |xargs kill"
+  elif [ $1 == "ls" ]
+  then
+    cmd="ls -l $3"
+  elif [ $1 == "scp" ]
+  then
+    local_dir=$3
+    remote_dir=$3
+    if [ $# -eq 4 ]
+    then
+      remote_dir=$4
+    fi
+    r=''
+    if [[ -d $3 ]]
+    then
+      r='-r'
+    fi
+    echo "scp $r $local_dir $i:$remote_dir"
+    scp $r $local_dir $i:$remote_dir
+  elif [ $1 == "ssh" ]
+  then
+    cmd="exit"
+  elif [ $1 == "ps" ]
+  then
+    cmd="ps ax|pgrep $3"
+  else
+    echo "Incorrect commands:" $1
+  fi
+  if [ $1 != "scp" ]
+  then
+    echo $cmd
+    ssh $i $cmd
+  fi
+done

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/communication/msg.cc
----------------------------------------------------------------------
diff --git a/src/communication/msg.cc b/src/communication/msg.cc
new file mode 100644
index 0000000..80f2304
--- /dev/null
+++ b/src/communication/msg.cc
@@ -0,0 +1,5 @@
+#include "communication/msg.h"
+
+namespace singa {
+} /* singa */
+

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/communication/socket.cc
----------------------------------------------------------------------
diff --git a/src/communication/socket.cc b/src/communication/socket.cc
new file mode 100644
index 0000000..279d758
--- /dev/null
+++ b/src/communication/socket.cc
@@ -0,0 +1,118 @@
+#include "communication/socket.h"
+
+namespace singa {
+Poller::Poller(){
+  poller_=zpoller_new(NULL);
+}
+
+void Poller::Add(Socket* socket){
+  zsock_t* zsock=static_cast<zsock_t*>(socket->InternalID());
+  zpoller_add(poller_, zsock);
+  zsock2Socket_[zsock]=socket;
+}
+
+Socket* Poller::Wait(int timeout){
+  zsock_t* sock=(zsock_t*)zpoller_wait(poller_, timeout);
+  if(sock!=NULL)
+    return zsock2Socket_[sock];
+  else return nullptr;
+}
+
+Dealer::Dealer(int id):id_(id){
+  dealer_=zsock_new(ZMQ_DEALER);
+  CHECK_NOTNULL(dealer_);
+  poller_=zpoller_new(dealer_);
+}
+
+int Dealer::Connect(string endpoint){
+  if(endpoint.length())
+    CHECK_EQ(zsock_connect(dealer_,endpoint.c_str()),0);
+  return 1;
+}
+int Dealer::Send(Msg *msg){
+  zmsg_t* zmsg=(static_cast<Msg*>(msg))->DumpToZmsg();
+  zmsg_send(&zmsg, dealer_);
+  delete msg;
+  return 1;
+}
+
+Msg* Dealer::Receive(){
+  zmsg_t* zmsg=zmsg_recv(dealer_);
+  if(zmsg==NULL)
+    return nullptr;
+  Msg* msg=new Msg();
+  msg->ParseFromZmsg(zmsg);
+  return msg;
+}
+Dealer::~Dealer(){
+  zsock_destroy(&dealer_);
+}
+
+Router::Router(int bufsize){
+  nBufmsg_=0;
+  bufsize_=bufsize;
+  router_=zsock_new(ZMQ_ROUTER);
+  CHECK_NOTNULL(router_);
+  poller_=zpoller_new(router_);
+}
+int Router::Bind(string endpoint){
+  if(endpoint.length())
+    CHECK_EQ(zsock_bind(router_, endpoint.c_str()),0);
+  return 1;
+}
+
+int Router::Send(Msg *msg){
+  zmsg_t* zmsg=static_cast<Msg*>(msg)->DumpToZmsg();
+  int dstid=static_cast<Msg*>(msg)->dst();
+  if(id2addr_.find(dstid)!=id2addr_.end()){
+    // the connection has already been set up
+    zframe_t* addr=zframe_dup(id2addr_[dstid]);
+    zmsg_prepend(zmsg, &addr);
+    zmsg_send(&zmsg, router_);
+  }else{
+    // the connection is not ready, buffer the message
+    if(bufmsg_.size()==0)
+      nBufmsg_=0;
+    bufmsg_[dstid].push_back(zmsg);
+    nBufmsg_++;
+    CHECK_LE(nBufmsg_, bufsize_);
+  }
+  delete msg;
+  return 1;
+}
+
+Msg* Router::Receive(){
+  zmsg_t* zmsg=zmsg_recv(router_);
+  if(zmsg==NULL)
+    return nullptr;
+  zframe_t* dealer=zmsg_pop(zmsg);
+  Msg* msg=new Msg();
+  msg->ParseFromZmsg(zmsg);
+  if (id2addr_.find(msg->src())==id2addr_.end()){
+    // new connection, store the sender's identfier and send buffered messages
+    // for it
+    id2addr_[msg->src()]=dealer;
+    if(bufmsg_.find(msg->src())!=bufmsg_.end()){
+      for(auto& it: bufmsg_.at(msg->src())){
+        zframe_t* addr=zframe_dup(dealer);
+        zmsg_prepend(it, &addr);
+        zmsg_send(&it, router_);
+      }
+      bufmsg_.erase(msg->src());
+    }
+  }
+  else
+    zframe_destroy(&dealer);
+  return msg;
+}
+
+Router::~Router(){
+  zsock_destroy(&router_);
+  for(auto it: id2addr_)
+    zframe_destroy(&it.second);
+  for(auto it: bufmsg_){
+    for(auto *msg: it.second)
+      zmsg_destroy(&msg);
+  }
+}
+} /* singa */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/main.cc
----------------------------------------------------------------------
diff --git a/src/main.cc b/src/main.cc
new file mode 100644
index 0000000..89306d8
--- /dev/null
+++ b/src/main.cc
@@ -0,0 +1,49 @@
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include "trainer/trainer.h"
+
+/**
+ * \file main.cc is the main entry of SINGA, like the driver program for Hadoop.
+ *
+ * 1. Users register their own implemented classes, e.g., layer, updater, etc.
+ * 2. Users prepare the google protobuf object for the model configuration and
+ * the cluster configuration.
+ * 3. Users call trainer to start the training.
+ *
+ * TODO
+ * 1. Add the resume function to continue training from a previously stopped
+ * point.
+ * 2. Add helper functions for users to configure their model and cluster
+ * easily, e.g., AddLayer(layer_type, source_layers, meta_data).
+ */
+
+DEFINE_int32(procsID, 0, "Global process ID");
+DEFINE_string(cluster, "examples/mnist/cluster.conf", "Cluster config file");
+DEFINE_string(model, "examples/mnist/conv.conf", "Model config file");
+
+/**
+ * Register layers, and other customizable classes.
+ *
+ * If users want to use their own implemented classes, they should register
+ * them here. Refer to the Worker::RegisterDefaultClasses()
+ */
+void RegisterClasses(const singa::ModelProto& proto){
+}
+
+int main(int argc, char **argv) {
+  // TODO set log dir
+  google::InitGoogleLogging(argv[0]);
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  singa::ClusterProto cluster;
+  singa::ReadProtoFromTextFile(FLAGS_cluster.c_str(), &cluster);
+  singa::ModelProto model;
+  singa::ReadProtoFromTextFile(FLAGS_model.c_str(), &model);
+  LOG(INFO)<<"The cluster config is\n"<<cluster.DebugString();
+  LOG(INFO)<<"The model config is\n"<<model.DebugString();
+
+  RegisterClasses(model);
+  singa::Trainer trainer;
+  trainer.Start(model, cluster, FLAGS_procsID);
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/neuralnet/base_layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/base_layer.cc b/src/neuralnet/base_layer.cc
new file mode 100644
index 0000000..50fc396
--- /dev/null
+++ b/src/neuralnet/base_layer.cc
@@ -0,0 +1,194 @@
+#include <opencv2/highgui/highgui.hpp>
+#include <opencv2/imgproc/imgproc.hpp>
+#include <cblas.h>
+#include <math.h>
+#include <cfloat>
+#include "neuralnet/base_layer.h"
+namespace singa {
+/*****************************************************************************
+ * Implementation for Layer
+ *****************************************************************************/
+void Layer::Init(const LayerProto &proto) {
+  layer_proto_=proto;
+}
+
+void Layer::Init(const Layer& other, const vector<int>& shape){
+  data_.Reshape(shape);
+  grad_.Reshape(shape);
+  layer_proto_=other.layer_proto_;
+}
+void Layer::Setup(){
+  Setup(layer_proto_, srclayers_);
+}
+void Layer::SetupAfterPartition(){
+  vector<int> shape=data_.shape();
+  SetupAfterPartition(layer_proto_, shape, srclayers_);
+  //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+  CHECK(std::equal(shape.begin(), shape.end(), data_.shape().begin()))<<name()
+    <<IntVecToString(shape)<<"--"<<IntVecToString(data_.shape());
+}
+void Layer::ComputeFeature(bool training){
+  ComputeFeature(training, srclayers_);
+}
+void Layer::ComputeGradient(){
+  ComputeGradient(srclayers_);
+}
+
+void Layer::ToProto(LayerProto *proto, bool copyData) {
+}
+void BridgeSrcLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  data_.Reshape(srclayers[0]->data(this).shape());
+  grad_.ReshapeLike(data_);
+}
+void BridgeSrcLayer::SetupAfterPartition(){
+  Setup(layer_proto_, srclayers_);
+  //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+}
+
+void BridgeSrcLayer::ComputeFeature(bool training,
+    const vector<SLayer>& srclayers){
+  if(training)
+    ready_=false;
+  else
+    ready_=true;
+}
+void BridgeSrcLayer::ComputeGradient(const vector<SLayer>& srclayers){
+
+}
+void BridgeDstLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  data_.Reshape(srclayers[0]->data(this).shape());
+  grad_.ReshapeLike(data_);
+}
+void BridgeDstLayer::SetupAfterPartition(){
+  Setup(layer_proto_, srclayers_);
+  //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+}
+
+void BridgeDstLayer::ComputeFeature(bool training,
+    const vector<SLayer>& srclayers){
+  if(training)
+    ready_=true;
+  else
+    ready_=false;
+}
+void BridgeDstLayer::ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){
+
+}
+
+/*******************************
+ * Implementation for ConcateLayer
+ *******************************/
+void ConcateLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  size_t concate_dim=proto.concate_param().concate_dimension();
+  CHECK_GE(concate_dim,0);
+  CHECK_GT(srclayers.size(),1);
+  vector<int> shape=srclayers[0]->data(this).shape();
+  for(size_t i=1;i<srclayers.size();i++){
+    const vector<int>& srcshape=srclayers[i]->data(this).shape();
+    for(size_t j=0;j<shape.size();j++)
+      if(j==concate_dim)
+        shape[j]+=srcshape[j];
+      else
+        CHECK_EQ(shape[j], srcshape[j]);
+  }
+  data_.Reshape(shape);
+  grad_.Reshape(shape);
+}
+
+void ConcateLayer::SetupAfterPartition(){
+  Setup(layer_proto_, srclayers_);
+//  LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+}
+
+void ConcateLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){}
+
+void ConcateLayer::ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){}
+/*****************************************************************************
+ * Implementation for SliceLayer
+ *****************************************************************************/
+void SliceLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  int slice_dim=proto.slice_param().slice_dimension();
+  int slice_num=proto.slice_param().slice_num();
+  CHECK_GE(slice_dim,0);
+  CHECK_EQ(slice_num, dstlayers_.size());
+  data_.Reshape(srclayers[0]->data(this).shape());
+  grad_.ReshapeLike(data_);
+  datavec_.resize(slice_num);
+  gradvec_.resize(slice_num);
+  //LOG(ERROR)<<"slice dim "<<slice_dim<<" slice num "<<slice_num;
+  for(int i=0;i<slice_num;i++){
+    vector<int> newshape(data_.shape());
+    newshape[slice_dim]=newshape[slice_dim]/slice_num+
+      ((i==slice_num-1)?newshape[slice_dim]%slice_num:0);
+    datavec_[i].Reshape(newshape);
+    gradvec_[i].Reshape(newshape);
+    //LOG(ERROR)<<"slice "<<IntVecToString(newshape);
+  }
+}
+
+void SliceLayer::SetupAfterPartition(){
+  Setup(layer_proto_, srclayers_);
+  //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+}
+
+
+int SliceLayer::SliceID(const Layer* layer) const {
+  CHECK(layer!= nullptr);
+  for(size_t i=0;i<datavec_.size();i++){
+    //LOG(ERROR)<<"get slice "<<IntVecToString(shapes_[i]);
+    if(dstlayers_[i].get() == layer)
+      return i;
+  }
+  CHECK(false);
+  return -1;
+}
+
+const Blob<float>& SliceLayer::data(const Layer* layer) const {
+  if(layer==nullptr)
+    return data_;
+  return datavec_[SliceID(layer)];
+}
+const Blob<float>& SliceLayer::grad(const Layer* layer) const {
+  if(layer==nullptr)
+    return grad_;
+  return gradvec_[SliceID(layer)];
+}
+Blob<float>* SliceLayer::mutable_data(const Layer* layer) {
+  if(layer==nullptr)
+    return &data_;
+  return &datavec_[SliceID(layer)];
+}
+Blob<float>* SliceLayer::mutable_grad(const Layer* layer){
+  if(layer==nullptr)
+    return &grad_;
+  return &gradvec_[SliceID(layer)];
+}
+void SliceLayer::ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers){}
+void SliceLayer::ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){}
+
+void SplitLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  data_.Reshape(srclayers[0]->data(this).shape());
+  grad_.Reshape(srclayers[0]->data(this).shape());
+}
+
+void SplitLayer::SetupAfterPartition(){
+  Setup(layer_proto_, srclayers_);
+  //LOG(ERROR)<<name()<<":"<<IntVecToString(shape_);
+}
+void SplitLayer::ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers){
+
+}
+void SplitLayer::ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){
+
+}
+
+}  // namespace singa
+

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/neuralnet/layer.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/layer.cc b/src/neuralnet/layer.cc
new file mode 100644
index 0000000..d45bcc0
--- /dev/null
+++ b/src/neuralnet/layer.cc
@@ -0,0 +1,781 @@
+#include <glog/logging.h>
+#include <memory>
+#include <algorithm>
+#include <opencv2/highgui/highgui.hpp>
+#include <opencv2/imgproc/imgproc.hpp>
+#include "mshadow/tensor.h"
+#include "mshadow/cxxnet_op.h"
+#include "neuralnet/layer.h"
+#include "utils/singleton.h"
+#include "utils/factory.h"
+
+using namespace mshadow;
+using namespace mshadow::expr;
+
+namespace singa {
+
+/************ Implementation for ConvProductLayer*************************/
+void ConvolutionLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  ConvolutionProto conv_param=proto.convolution_param();
+  kernel_=conv_param.kernel();
+  CHECK_GT(kernel_, 0) << "Filter size cannot be zero.";
+  pad_=conv_param.pad();
+  stride_=conv_param.stride();
+  num_filters_=conv_param.num_filters();
+  const vector<int>& srcshape=srclayers[0]->data(this).shape();
+  int dim=srcshape.size();
+  CHECK_GT(dim, 2);
+  width_=srcshape[dim-1];
+  height_=srcshape[dim-2];
+  if(dim>3)
+    channels_=srcshape[dim-3];
+  else if(dim>2)
+    channels_=1;
+  batchsize_=srcshape[0];
+  conv_height_=(height_ + 2 * pad_ - kernel_) / stride_ + 1;
+  conv_width_= (width_ + 2 * pad_ - kernel_) / stride_ + 1;
+  col_height_=channels_*kernel_*kernel_;
+  col_width_=conv_height_*conv_width_;
+  vector<int> shape{batchsize_, num_filters_, conv_height_, conv_width_};
+  data_.Reshape(shape);
+  grad_.Reshape(shape);
+  col_data_.Reshape(vector<int>{col_height_, col_width_});
+  col_grad_.Reshape(vector<int>{col_height_, col_width_});
+
+  Factory<Param>* factory=Singleton<Factory<Param>>::Instance();
+  weight_=shared_ptr<Param>(factory->Create("Param"));
+  weight_->Setup(proto.param(0), vector<int>{num_filters_, col_height_}, col_height_);
+  bias_=shared_ptr<Param>(factory->Create("Param"));
+  bias_->Setup(proto.param(1), vector<int>{num_filters_},0);
+}
+
+void ConvolutionLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  LayerProto newproto(proto);
+  ConvolutionProto *conv_param=newproto.mutable_convolution_param();
+  conv_param->set_num_filters(shape[1]);
+  Setup(newproto, srclayers);
+}
+
+void ConvolutionLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
+      Shape4(batchsize_, channels_, height_, width_));
+  Tensor<cpu, 3> data(data_.mutable_cpu_data(),
+      Shape3(batchsize_, num_filters_, conv_height_* conv_width_));
+  Tensor<cpu, 2> col(col_data_.mutable_cpu_data(),
+      Shape2(col_height_, col_width_));
+  Tensor<cpu, 2> weight(weight_->mutable_cpu_data(),
+      Shape2(num_filters_, col_height_));
+  Tensor<cpu, 1> bias(bias_->mutable_cpu_data(),
+      Shape1(num_filters_));
+
+  for(int n=0;n<batchsize_;n++){
+    if(pad_>0)
+      col=unpack_patch2col(pad(src[n], pad_), kernel_, stride_);
+    else
+      col=unpack_patch2col(src[n], kernel_, stride_);
+    data[n]=dot(weight, col);
+  }
+  data+=broadcast<1>(bias, data.shape);
+}
+
+void ConvolutionLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
+      Shape4(batchsize_, channels_, height_, width_));
+  Tensor<cpu, 2> col(col_data_.mutable_cpu_data(),
+      Shape2(col_height_, col_width_));
+  Tensor<cpu, 2> weight(weight_->mutable_cpu_data(),
+      Shape2(num_filters_, col_height_));
+
+  Blob<float>* gsrcblob=srclayers[0]->mutable_grad(this);
+  Tensor<cpu, 4> gsrc(Shape4(batchsize_, channels_, height_, width_));
+  if(gsrcblob!=nullptr)
+    gsrc.dptr=gsrcblob->mutable_cpu_data();
+  Tensor<cpu, 3> grad(grad_.mutable_cpu_data(),
+      Shape3(batchsize_, num_filters_, conv_height_* conv_width_));
+  Tensor<cpu, 2> gcol(col_grad_.mutable_cpu_data(),
+      Shape2(col_height_, col_width_));
+  Tensor<cpu, 2> gweight(weight_->mutable_cpu_grad(),
+      Shape2(num_filters_, col_height_));
+  Tensor<cpu, 1> gbias(bias_->mutable_cpu_grad(),
+      Shape1(num_filters_));
+
+  gweight=0.0f;
+  gbias=sumall_except_dim<1>(grad);
+  Shape<3> padshape(gsrc.shape.SubShape());
+  padshape[0]+=2*pad_;padshape[1]+=2*pad_;
+  Shape<2> imgshape=Shape2(height_, width_);
+  for(int n=0;n<batchsize_;n++){
+    if(pad_>0)
+      col=unpack_patch2col(pad(src[n], pad_), kernel_, stride_);
+    else
+      col=unpack_patch2col(src[n], kernel_, stride_);
+    gweight+=dot(grad[n], col.T());
+
+    if(gsrcblob!=nullptr){
+      gcol=dot(weight.T(), grad[n]);
+      gsrc[n]=crop(pack_col2patch(gcol, padshape, kernel_, stride_), imgshape);
+    }
+  }
+}
+
+/****************** Implementation for DropoutLayer ***********************/
+void DropoutLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  data_.ReshapeLike(srclayers[0]->data(this));
+  grad_.ReshapeLike(*srclayers[0]->mutable_grad(this));
+  mask_.Reshape(srclayers[0]->data(this).shape());
+  pdrop_=proto.dropout_param().dropout_ratio();
+  unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
+  ASingleton<Random<cpu>>::Instance(seed);
+}
+
+void DropoutLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+
+void DropoutLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers) {
+  // check training
+  if(!training){
+    data_.CopyFrom(srclayers[0]->data());
+    return;
+  }
+  float pkeep=1-pdrop_;
+  Tensor<cpu, 1> mask(mask_.mutable_cpu_data(), Shape1(mask_.count()));
+  mask = F<op::threshold>(ASingleton<Random<cpu>>::Instance()\
+      ->uniform(mask.shape), pkeep ) * (1.0f/pkeep);
+  Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
+  Blob<float>* srcblob=srclayers[0]->mutable_data();
+  Tensor<cpu, 1> src(srcblob->mutable_cpu_data(), Shape1(srcblob->count()));
+  data=src*mask;
+}
+
+void DropoutLayer::ComputeGradient(const vector<SLayer>& srclayers)  {
+  Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(data_.count()));
+  Tensor<cpu, 1> mask(mask_.mutable_cpu_data(), Shape1(mask_.count()));
+  Blob<float>* gsrcblob=srclayers[0]->mutable_grad();
+  Tensor<cpu, 1> gsrc(gsrcblob->mutable_cpu_data(), Shape1(gsrcblob->count()));
+  gsrc=grad*mask;
+}
+/**************** Implementation for InnerProductLayer********************/
+void InnerProductLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  const auto& src=srclayers[0]->data(this);
+  batchsize_=src.shape()[0];
+  vdim_=src.count()/batchsize_;
+  hdim_=proto.inner_product_param().num_output();
+  data_.Reshape(vector<int>{batchsize_, hdim_});
+  grad_.ReshapeLike(data_);
+  Factory<Param>* factory=Singleton<Factory<Param>>::Instance();
+  weight_=shared_ptr<Param>(factory->Create("Param"));
+  bias_=shared_ptr<Param>(factory->Create("Param"));
+  weight_->Setup(proto.param(0), vector<int>{vdim_, hdim_}, vdim_*hdim_);
+  bias_->Setup(proto.param(1), vector<int>{hdim_},0);
+}
+void InnerProductLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  LayerProto newproto(proto);
+  InnerProductProto * innerproto=newproto.mutable_inner_product_param();
+  innerproto->set_num_output(shape[1]);
+  Setup(newproto, srclayers);
+}
+
+void InnerProductLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers) {
+  Tensor<cpu, 2> data(data_.mutable_cpu_data(), Shape2(batchsize_,hdim_));
+  CHECK_EQ(srclayers[0]->data().count(), batchsize_*vdim_);
+  Tensor<cpu, 2> src(srclayers[0]->mutable_data()->mutable_cpu_data(),
+      Shape2(batchsize_,vdim_));
+  Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), Shape2(vdim_,hdim_));
+  Tensor<cpu, 1> bias(bias_->mutable_cpu_data(), Shape1(hdim_));
+  data=dot(src, weight);
+  // repmat: repeat bias vector into batchsize rows
+  data+=repmat(bias, batchsize_);
+}
+
+void InnerProductLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  Tensor<cpu, 2> src(srclayers[0]->mutable_data()->mutable_cpu_data(),
+      Shape2(batchsize_,vdim_));
+  Tensor<cpu, 2> grad(grad_.mutable_cpu_data(),Shape2(batchsize_,hdim_));
+  Tensor<cpu, 2> weight(weight_->mutable_cpu_data(), Shape2(vdim_,hdim_));
+  Tensor<cpu, 2> gweight(weight_->mutable_cpu_grad(), Shape2(vdim_,hdim_));
+  Tensor<cpu, 1> gbias(bias_->mutable_cpu_grad(), Shape1(hdim_));
+
+  gbias=sum_rows(grad);
+  gweight=dot(src.T(), grad);
+  if(srclayers[0]->mutable_grad(this)!=nullptr){
+    Tensor<cpu, 2> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),
+        Shape2(batchsize_,vdim_));
+    gsrc=dot(grad, weight.T());
+  }
+}
+/*****************************************************************************
+ * Implementation for LabelLayer
+ *****************************************************************************/
+void LabelLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize();
+  data_.Reshape(vector<int>{batchsize});
+}
+
+void LabelLayer::ParseRecords(bool training, const vector<Record>& records, Blob<float>* blob){
+  LOG_IF(ERROR, records.size()==0)<<"Empty records to parse";
+  float *label= blob->mutable_cpu_data() ;
+  int rid=0;
+  for(const Record& record: records){
+    label[rid++]=record.image().label();
+    CHECK_LT(record.image().label(),10);
+  }
+  CHECK_EQ(rid, blob->shape()[0]);
+}
+
+
+/*********************LMDBDataLayer**********************************/
+void LMDBDataLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){
+  if(random_skip_){
+    int nskip=rand()%random_skip_;
+    int n=0;
+    CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+          &mdb_value_, MDB_FIRST), MDB_SUCCESS);
+    while (mdb_cursor_get(mdb_cursor_, &mdb_key_,
+          &mdb_value_, MDB_NEXT) == MDB_SUCCESS)
+      n++;
+    LOG(INFO)<<"Random Skip "<<nskip<<" records of total "<<n<<"records";
+    // We have reached the end. Restart from the first.
+    CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+          &mdb_value_, MDB_FIRST), MDB_SUCCESS);
+    for(int i=0;i<nskip;i++){
+      if (mdb_cursor_get(mdb_cursor_, &mdb_key_,
+            &mdb_value_, MDB_NEXT) != MDB_SUCCESS) {
+        // We have reached the end. Restart from the first.
+        DLOG(INFO) << "Restarting data prefetching from start.";
+        CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+              &mdb_value_, MDB_FIRST), MDB_SUCCESS);
+      }
+    }
+    random_skip_=0;
+  }
+  Datum datum;
+  for(auto& record: records_){
+    SingleLabelImageRecord* image=record.mutable_image();
+    CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+          &mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS);
+    datum.ParseFromArray(mdb_value_.mv_data, mdb_value_.mv_size);
+    ConvertDatumToSingleLableImageRecord(datum, image);
+    if (mdb_cursor_get(mdb_cursor_, &mdb_key_,
+          &mdb_value_, MDB_NEXT) != MDB_SUCCESS) {
+      // We have reached the end. Restart from the first.
+      DLOG(INFO) << "Restarting data prefetching from start.";
+      CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+            &mdb_value_, MDB_FIRST), MDB_SUCCESS);
+    }
+  }
+}
+
+void LMDBDataLayer::ConvertDatumToSingleLableImageRecord(const Datum& datum,
+    SingleLabelImageRecord* record){
+  record->set_label(datum.label());
+  record->clear_shape();
+  if(datum.has_channels())
+    record->add_shape(datum.channels());
+  if(datum.has_height())
+    record->add_shape(datum.height());
+  if(datum.has_width())
+    record->add_shape(datum.width());
+  if(datum.has_data())
+    record->set_pixel(datum.data());
+  if(datum.float_data_size()){
+    record->clear_data();
+    for(float x: datum.float_data())
+      record->add_data(x);
+  }
+}
+
+void LMDBDataLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(mdb_env_create(&mdb_env_), MDB_SUCCESS) << "mdb_env_create failed";
+  CHECK_EQ(mdb_env_set_mapsize(mdb_env_, 1099511627776), MDB_SUCCESS); // 1TB
+  CHECK_EQ(mdb_env_open(mdb_env_,
+        proto.data_param().path().c_str(),
+        MDB_RDONLY, 0664), MDB_SUCCESS) << "cannot open lmdb "
+    << proto.data_param().path();
+  CHECK_EQ(mdb_txn_begin(mdb_env_, NULL, MDB_RDONLY, &mdb_txn_), MDB_SUCCESS)
+    << "mdb_txn_begin failed";
+  CHECK_EQ(mdb_open(mdb_txn_, NULL, 0, &mdb_dbi_), MDB_SUCCESS)
+    << "mdb_open failed";
+  CHECK_EQ(mdb_cursor_open(mdb_txn_, mdb_dbi_, &mdb_cursor_), MDB_SUCCESS)
+    << "mdb_cursor_open failed";
+  LOG(INFO) << "Opening lmdb " << proto.data_param().path();
+  CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_FIRST),
+      MDB_SUCCESS) << "mdb_cursor_get failed";
+
+  if (mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_NEXT)
+      != MDB_SUCCESS) {
+    CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_,
+          MDB_FIRST), MDB_SUCCESS);
+  }
+  Datum datum;
+  datum.ParseFromArray(mdb_value_.mv_data, mdb_value_.mv_size);
+  SingleLabelImageRecord* record=sample_.mutable_image();
+  ConvertDatumToSingleLableImageRecord(datum, record);
+
+  batchsize_=proto.data_param().batchsize();
+  records_.resize(batchsize_);
+  random_skip_=proto.data_param().random_skip();
+}
+
+/***************** Implementation for LRNLayer *************************/
+void LRNLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  lsize_ = proto.lrn_param().local_size();
+  CHECK_EQ(lsize_ % 2, 1) << "LRN only supports odd values for Localvol";
+  knorm_=proto.lrn_param().knorm();
+  alpha_ = proto.lrn_param().alpha();
+  beta_ = proto.lrn_param().beta();
+
+  const vector<int>& s=srclayers[0]->data(this).shape();
+  data_.Reshape(s);
+  grad_.Reshape(s);
+  norm_.Reshape(s);
+  batchsize_=s[0];
+  channels_=s[1];
+  height_=s[2];
+  width_=s[3];
+}
+
+void LRNLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+
+void LRNLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){
+  const float salpha = alpha_ / lsize_;
+  Shape<4> s=Shape4(batchsize_,channels_, height_, width_);
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(), s);
+  Tensor<cpu, 4> data(data_.mutable_cpu_data(), s);
+  Tensor<cpu, 4> norm(norm_.mutable_cpu_data(), s);
+  // stores normalizer without power
+  norm= chpool<red::sum>( F<op::square>(src) , lsize_ ) * salpha + knorm_;
+  data = src * F<op::power>(norm, -beta_ );
+}
+
+void LRNLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  const float salpha = alpha_ / lsize_;
+  Shape<4> s=Shape4(batchsize_,channels_, height_, width_);
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data()->mutable_cpu_data(), s);
+  Tensor<cpu, 4> norm(norm_.mutable_cpu_data(), s);
+  Tensor<cpu, 4> grad(grad_.mutable_cpu_data(), s);
+  Tensor<cpu, 4> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(), s);
+
+  gsrc = grad * F<op::power>( norm, -beta_ );
+  gsrc += ( - 2.0f * beta_ * salpha ) * chpool<red::sum>(
+      grad * src * F<op::power>( norm, -beta_-1.0f ), lsize_ )  * src;
+}
+
+/**************** Implementation for MnistImageLayer******************/
+
+void MnistImageLayer::ParseRecords(bool training, const vector<Record>& records,
+    Blob<float>* blob){
+  LOG_IF(ERROR, records.size()==0)<<"Empty records to parse";
+  int ndim=records.at(0).image().shape_size();
+  int inputsize =records.at(0).image().shape(ndim-1);
+
+  float* dptr=blob->mutable_cpu_data();
+  for(const Record& record: records){
+    // copy from record to cv::Mat
+    cv::Mat input(inputsize, inputsize, CV_32FC1);
+    const SingleLabelImageRecord& imagerecord=record.image();
+    if(imagerecord.pixel().size()){
+      string pixel=imagerecord.pixel();
+      for(int i=0,k=0;i<inputsize;i++)
+        for(int j=0;j<inputsize;j++)
+          // NOTE!!! must cast pixel to uint8_t then to float!!! waste a lot of
+          // time to debug this
+          input.at<float>(i,j)=static_cast<float>(static_cast<uint8_t>(pixel[k++]));
+    }else{
+      for(int i=0,k=0;i<inputsize;i++)
+        for(int j=0;j<inputsize;j++)
+          input.at<float>(i,j)=imagerecord.data(k++);
+    }
+    int size=blob->shape()[1];
+    /*
+    cv::Mat resizeMat=input;
+    // affine transform, scaling, rotation and shearing
+    if(gamma_){
+      float r1=rand_real()*2-1;
+      float r2=rand_real()*2-1;
+      int h=static_cast<int>(inputsize*(1.+r1*gamma_/100.0));
+      int w=static_cast<int>(inputsize*(1.+r2*gamma_/100.0));
+      cv::resize(input, resizeMat, cv::Size(h,w));
+    }
+    cv::Mat betaMat=resizeMat;
+    cv::Mat warpmat(2,3, CV_32FC1);
+    warpmat.at<float>(0,0)=1.0;
+    warpmat.at<float>(0,1)=0.0;
+    warpmat.at<float>(0,2)=0.0;
+    warpmat.at<float>(1,0)=0.0;
+    warpmat.at<float>(1,1)=1.0;
+    warpmat.at<float>(1,2)=0.0;
+
+    if(beta_){
+      float r=rand_real()*2-1;
+      if(rand() % 2){ // rotation
+        cv::Point center(resizeMat.rows/2, resizeMat.cols/2);
+        warpmat=cv::getRotationMatrix2D(center, r*beta_, 1.0);
+      }else{
+        //shearing
+        warpmat.at<float>(0,1)=r*beta_/90;
+        if(imagerecord.label()==1 ||imagerecord.label()==7)
+          warpmat.at<float>(0,1)/=2.0;
+      }
+    }
+    cv::warpAffine(resizeMat, betaMat, warpmat, cv::Size(size, size));
+    */
+
+    for(int i=0;i<size;i++){
+      for(int j=0;j<size;j++){
+        *dptr=input.at<float>(i,j)/norm_a_-norm_b_;
+        dptr++;
+      }
+    }
+  }
+  CHECK_EQ(dptr, blob->mutable_cpu_data()+blob->count());
+}
+void MnistImageLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize();
+  Record sample=static_cast<DataLayer*>(srclayers[0].get())->sample();
+  kernel_=proto.mnist_param().kernel();
+  sigma_=proto.mnist_param().sigma();
+  alpha_=proto.mnist_param().alpha();
+  beta_=proto.mnist_param().beta();
+  gamma_=proto.mnist_param().gamma();
+  resize_=proto.mnist_param().resize();
+  norm_a_=proto.mnist_param().norm_a();
+  norm_b_=proto.mnist_param().norm_b();
+  elastic_freq_=proto.mnist_param().elastic_freq();
+
+  int ndim=sample.image().shape_size();
+  CHECK_GE(ndim,2);
+  if(resize_)
+    data_.Reshape(vector<int>{batchsize, resize_, resize_});
+  else{
+    int s=sample.image().shape(ndim-1);
+    CHECK_EQ(s,sample.image().shape(ndim-2));
+    data_.Reshape(vector<int>{batchsize, s, s });
+  }
+}
+
+/******************** Implementation for PoolingLayer******************/
+void PoolingLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  PoolingProto pool_param = proto.pooling_param();
+  kernel_=pool_param.kernel();
+  stride_=pool_param.stride();
+  CHECK_LT(pad_, kernel_);
+  pool_=proto.pooling_param().pool();
+  CHECK(pool_ == PoolingProto_PoolMethod_AVE
+        || pool_ == PoolingProto_PoolMethod_MAX)
+      << "Padding implemented only for average and max pooling.";
+
+  const auto& srcshape=srclayers[0]->data(this).shape();
+  int dim=srcshape.size();
+  CHECK_GT(dim,2);
+  width_ = srcshape[dim-1];
+  height_ = srcshape[dim-2];
+  if(dim>3)
+    channels_ = srcshape[dim-3];
+  else
+    channels_=1;
+  batchsize_=srcshape[0];
+  pooled_height_ = static_cast<int>((height_ - kernel_) / stride_) + 1;
+  pooled_width_ = static_cast<int>(( width_ - kernel_) / stride_) + 1;
+  data_.Reshape(vector<int>{batchsize_, channels_, pooled_height_, pooled_width_});
+  grad_.ReshapeLike(data_);
+}
+
+void PoolingLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+
+void PoolingLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
+      Shape4(batchsize_, channels_, height_, width_));
+  Tensor<cpu, 4> data(data_.mutable_cpu_data(),
+      Shape4(batchsize_, channels_, pooled_height_, pooled_width_));
+  if(pool_ == PoolingProto_PoolMethod_MAX)
+    data=pool<red::maximum>(src, kernel_, stride_);
+  else if(pool_ == PoolingProto_PoolMethod_AVE)
+    data=pool<red::sum>(src, kernel_, stride_)
+      *(1.0f/(kernel_*kernel_));
+}
+
+/*
+ * partition only on num/channel dim
+ * assume grad and data have the same paritition
+ */
+void PoolingLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  Shape<4> s1= Shape4(batchsize_, channels_, height_, width_);
+  Tensor<cpu, 4> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),s1);
+  Tensor<cpu, 4> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),s1);
+  Shape<4> s2= Shape4(batchsize_, channels_, pooled_height_, pooled_width_);
+  Tensor<cpu, 4> data(data_.mutable_cpu_data(), s2);
+  Tensor<cpu, 4> grad(grad_.mutable_cpu_data(), s2);
+  if(pool_ == PoolingProto_PoolMethod_MAX)
+      gsrc = unpool<red::maximum>(src, data, grad, kernel_, stride_);
+  else if(pool_ == PoolingProto_PoolMethod_AVE)
+      gsrc = unpool<red::sum>(src, data, grad, kernel_, stride_)
+        *(1.0f/(kernel_*kernel_));
+}
+
+/***************** Implementation for ReLULayer *****************************/
+
+void ReLULayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  data_.ReshapeLike(srclayers[0]->data());
+  grad_.ReshapeLike(*(srclayers[0]->mutable_grad()));
+}
+
+void ReLULayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+
+void ReLULayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){
+  Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
+  Tensor<cpu, 1> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
+      Shape1(data_.count()));
+  data=F<op::relu>(src);
+}
+
+void ReLULayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(grad_.count()));
+  Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
+  Tensor<cpu, 1> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),
+      Shape1(data_.count()));
+  gsrc=F<op::relu_grad>(data)*grad;
+}
+
+/*************** Implementation for RGBImageLayer *************************/
+
+void RGBImageLayer::ParseRecords(bool training, const vector<Record>& records,
+    Blob<float>* blob){
+  LOG_IF(ERROR, records.size()==0)<<"Empty records to parse";
+  const vector<int>& s=blob->shape();
+  Tensor<cpu, 4> images(blob->mutable_cpu_data(), Shape4(s[0],s[1],s[2],s[3]));
+  const SingleLabelImageRecord& r=records.at(0).image();
+  Tensor<cpu, 3> raw_image(Shape3(r.shape(0),r.shape(1),r.shape(2)));
+  AllocSpace(raw_image);
+  Tensor<cpu, 3> croped_image(Shape3(s[1],s[2],s[3]));
+  if(cropsize_)
+    AllocSpace(croped_image);
+    //CHECK(std::equal(croped_image.shape(), raw_image.shape());
+  int rid=0;
+  const float* meandptr=mean_.cpu_data();
+  for(const Record& record: records){
+    auto image=images[rid];
+    bool do_crop=cropsize_>0&&training;
+    bool do_mirror=mirror_&&rand()%2&&training;
+    float* dptr=nullptr;
+    if(do_crop||do_mirror)
+      dptr=raw_image.dptr;
+    else
+      dptr=image.dptr;
+    if(record.image().pixel().size()){
+      string pixel=record.image().pixel();
+      for(size_t i=0;i<pixel.size();i++)
+        dptr[i]=static_cast<float>(static_cast<uint8_t>(pixel[i]));
+    }else {
+      memcpy(dptr, record.image().data().data(),
+          sizeof(float)*record.image().data_size());
+    }
+    for(int i=0;i<mean_.count();i++)
+      dptr[i]-=meandptr[i];
+
+    if(do_crop){
+      int hoff=rand()%(r.shape(1)-cropsize_);
+      int woff=rand()%(r.shape(2)-cropsize_);
+      Shape<2> cropshape=Shape2(cropsize_, cropsize_);
+      if(do_mirror){
+        croped_image=crop(raw_image, cropshape, hoff, woff);
+        image=mirror(croped_image);
+      }else{
+        image=crop(raw_image, cropshape, hoff, woff);
+      }
+    }else if(do_mirror){
+      image=mirror(raw_image);
+    }
+    rid++;
+  }
+  if(scale_)
+    images=images*scale_;
+
+  FreeSpace(raw_image);
+  if(cropsize_)
+    FreeSpace(croped_image);
+}
+void RGBImageLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),1);
+  scale_=proto.rgbimage_param().scale();
+  cropsize_=proto.rgbimage_param().cropsize();
+  mirror_=proto.rgbimage_param().mirror();
+  int batchsize=static_cast<DataLayer*>(srclayers[0].get())->batchsize();
+  Record sample=static_cast<DataLayer*>(srclayers[0].get())->sample();
+  vector<int> shape;
+  shape.push_back(batchsize);
+  for(int x: sample.image().shape())
+    shape.push_back(x);
+  CHECK_EQ(shape.size(),4);
+  if(cropsize_){
+    shape[2]=cropsize_;
+    shape[3]=cropsize_;
+  }
+  data_.Reshape(shape);
+  mean_.Reshape({shape[1],shape[2],shape[3]});
+  if(proto.rgbimage_param().has_meanfile()){
+    BlobProto tmp;
+    ReadProtoFromBinaryFile(proto.rgbimage_param().meanfile().c_str(), &tmp);
+    CHECK_EQ(mean_.count(), tmp.data_size());
+    memcpy(mean_.mutable_cpu_data(), tmp.data().data(), sizeof(float)*tmp.data_size());
+  }else{
+    memset(mean_.mutable_cpu_data(),0,sizeof(float)*mean_.count());
+  }
+}
+
+/***************Implementation for ShardDataLayer**************************/
+void ShardDataLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){
+  if(random_skip_){
+    int nskip=rand()%random_skip_;
+    LOG(INFO)<<"Random Skip "<<nskip<<" records, there are "<<shard_->Count()
+      <<" records in total";
+    string key;
+    for(int i=0;i<nskip;i++){
+      shard_->Next(&key, &sample_);
+    }
+    random_skip_=0;
+  }
+  for(auto& record: records_){
+    string key;
+    shard_->Next(&key, &record);
+  }
+}
+
+void ShardDataLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  shard_= std::make_shared<DataShard>(proto.data_param().path(),
+      DataShard::kRead);
+  string key;
+  shard_->Next(&key, &sample_);
+  batchsize_=proto.data_param().batchsize();
+
+  records_.resize(batchsize_);
+  random_skip_=proto.data_param().random_skip();
+}
+/*******************Implementation of TanLayer***************************/
+void TanhLayer::Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers){
+  data_.ReshapeLike(srclayers[0]->data(this));
+  grad_.ReshapeLike(srclayers[0]->grad(this));
+}
+
+void TanhLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+
+
+void TanhLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers){
+  Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
+  Tensor<cpu, 1> src(srclayers[0]->mutable_data(this)->mutable_cpu_data(),
+      Shape1(data_.count()));
+  data=F<op::stanh>(src);
+}
+
+void TanhLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  Tensor<cpu, 1> data(data_.mutable_cpu_data(), Shape1(data_.count()));
+  Tensor<cpu, 1> grad(grad_.mutable_cpu_data(), Shape1(grad_.count()));
+  Tensor<cpu, 1> gsrc(srclayers[0]->mutable_grad(this)->mutable_cpu_data(),
+      Shape1(data_.count()));
+  gsrc=F<op::stanh_grad>(data)*grad;
+}
+/********** * Implementation for SoftmaxLossLayer*************************/
+void SoftmaxLossLayer::Setup(const LayerProto& proto,
+    const vector<SLayer>& srclayers){
+  CHECK_EQ(srclayers.size(),2);
+  data_.Reshape(srclayers[0]->data(this).shape());
+  batchsize_=data_.shape()[0];
+  dim_=data_.count()/batchsize_;
+  topk_=proto.softmaxloss_param().topk();
+  metric_.Reshape(vector<int>{2});
+  scale_=proto.softmaxloss_param().scale();
+}
+void SoftmaxLossLayer::SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){
+  Setup(proto, srclayers);
+}
+void SoftmaxLossLayer::ComputeFeature(bool training, const vector<SLayer>& srclayers) {
+  Shape<2> s=Shape2(batchsize_, dim_);
+  Tensor<cpu, 2> prob(data_.mutable_cpu_data(), s);
+  Tensor<cpu, 2> src(srclayers[0]->mutable_data()->mutable_cpu_data(), s);
+  Softmax(prob, src);
+  const float* label=srclayers[1]->data().cpu_data();
+  const float* probptr=prob.dptr;
+  float loss=0, precision=0;
+  for(int n=0;n<batchsize_;n++){
+    int ilabel=static_cast<int>(label[n]);
+    CHECK_LT(ilabel,10);
+    CHECK_GE(ilabel,0);
+    float prob_of_truth=probptr[ilabel];
+    loss-=log(std::max(prob_of_truth, FLT_MIN));
+    vector<std::pair<float, int> > probvec;
+    for (int j = 0; j < dim_; ++j) {
+      probvec.push_back(std::make_pair(probptr[j], j));
+    }
+    std::partial_sort(
+        probvec.begin(), probvec.begin() + topk_,
+        probvec.end(), std::greater<std::pair<float, int> >());
+    // check if true label is in top k predictions
+    for (int k = 0; k < topk_; k++) {
+      if (probvec[k].second == static_cast<int>(label[n])) {
+        precision++;
+        break;
+      }
+    }
+    probptr+=dim_;
+  }
+  CHECK_EQ(probptr, prob.dptr+prob.shape.Size());
+  float *metric=metric_.mutable_cpu_data();
+  metric[0]=loss*scale_/(1.0f*batchsize_);
+  metric[1]=precision*scale_/(1.0f*batchsize_);
+}
+
+void SoftmaxLossLayer::ComputeGradient(const vector<SLayer>& srclayers) {
+  const float* label=srclayers[1]->data().cpu_data();
+  Blob<float>* gsrcblob=srclayers[0]->mutable_grad();
+  gsrcblob->CopyFrom(data_);
+  float* gsrcptr=gsrcblob->mutable_cpu_data();
+  for(int n=0;n<batchsize_;n++){
+    gsrcptr[n*dim_+static_cast<int>(label[n])]-=1.0f;
+  }
+  Tensor<cpu, 1> gsrc(gsrcptr, Shape1(gsrcblob->count()));
+  gsrc*=scale_/(1.0f*batchsize_);
+}
+
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/neuralnet/neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuralnet.cc b/src/neuralnet/neuralnet.cc
new file mode 100644
index 0000000..0bca26e
--- /dev/null
+++ b/src/neuralnet/neuralnet.cc
@@ -0,0 +1,401 @@
+#include <algorithm>
+#include <queue>
+
+#include "neuralnet/neuralnet.h"
+#include "utils/singleton.h"
+#include "utils/factory.h"
+#include "utils/graph.h"
+
+
+namespace singa {
+#define CreateLayer(id) CreateInstance(id, Layer)
+
+void NeuralNet::RegisterLayers(){
+  Factory<Layer>* factory=Singleton<Factory<Layer>>::Instance();
+  factory->Register("kConvolution", CreateLayer(ConvolutionLayer));
+  factory->Register("kConcate", CreateLayer(ConcateLayer));
+  factory->Register("kDropout", CreateLayer(DropoutLayer));
+  factory->Register("kInnerProduct", CreateLayer(InnerProductLayer));
+  factory->Register("kRGBImage", CreateLayer(RGBImageLayer));
+  factory->Register("kLabel", CreateLayer(LabelLayer));
+  factory->Register("kLMDBData", CreateLayer(LMDBDataLayer));
+  factory->Register("kLRN", CreateLayer(LRNLayer));
+  factory->Register("kMnistImage", CreateLayer(MnistImageLayer));
+  factory->Register("kBridgeDst", CreateLayer(BridgeDstLayer));
+  factory->Register("kBridgeSrc", CreateLayer(BridgeSrcLayer));
+  factory->Register("kPooling", CreateLayer(PoolingLayer));
+  factory->Register("kReLU", CreateLayer(ReLULayer));
+  factory->Register("kShardData", CreateLayer(ShardDataLayer));
+  factory->Register("kSlice", CreateLayer(SliceLayer));
+  factory->Register("kSoftmaxLoss", CreateLayer(SoftmaxLossLayer));
+  factory->Register("kSplit", CreateLayer(SplitLayer));
+  factory->Register("kTanh", CreateLayer(TanhLayer));
+}
+shared_ptr<NeuralNet> NeuralNet::SetupNeuralNet(const NetProto& np, Phase phase){
+  NetProto proto;
+  proto.set_partition_type(np.partition_type());
+  // exclude layers if necessary
+  for(auto& layer:np.layer()){
+    bool include=true;
+    for(int x: layer.exclude()){
+      if(x==phase)
+        include=false;
+    }
+    if(include){
+      LayerProto* lp=proto.add_layer();
+      lp->CopyFrom(layer);
+    }
+  }
+  LOG(INFO)<<"NeuralNet config is "<<proto.DebugString();
+  shared_ptr<NeuralNet> net(new NeuralNet(proto));
+  return net;
+}
+NeuralNet::NeuralNet(NetProto net_proto, int group_size) {
+  group_size_=group_size;
+  for(int i=0;i<net_proto.layer_size();i++){
+    LayerProto * layer_proto=net_proto.mutable_layer(i);
+    if(!layer_proto->has_partition_type())
+      layer_proto->set_partition_type(net_proto.partition_type());
+  }
+
+  LOG(INFO)<<"Construct Neural Net...";
+  ConstructNeuralNet(net_proto);
+  if(group_size_>1)
+    PartitionNeuralNet();
+  for(auto layer: layers_){
+    DLOG(INFO)<<layer->name();
+  }
+  // assign id for params;
+  int paramid=0;
+  for(auto& layer: layers_){
+    for(shared_ptr<Param> p: layer->GetParams()){
+      params_.push_back(p);
+      p->set_id(paramid++);
+    }
+  }
+
+  LOG(INFO)<<"Neural Net constructed";
+}
+
+void NeuralNet::ConstructNeuralNet(const NetProto& net_proto){
+  // construct graph, one node for one layer, identified by layer name
+  map<string, LayerProto> protos;
+  for (auto &layer_proto : net_proto.layer()){
+    graph_.AddNode(layer_proto.name());
+    protos[layer_proto.name()]=layer_proto;
+  }
+  for (auto &layer_proto : net_proto.layer())
+    if(layer_proto.srclayers_size())
+      for(const string& src: layer_proto.srclayers())
+        graph_.AddEdge(src, layer_proto.name());
+
+  // topology sort
+  graph_.Sort();
+  //DLOG(INFO)<<"pure graph without partition\n"<< graph_.ToString();
+
+  auto* factory=Singleton<Factory<Layer>>::Instance();
+  // create Layers according to topology order
+  for(SNode node: graph_.nodes()){
+    shared_ptr<Layer> layer(factory->Create(protos[node->name()].type()));
+    layer->Init(protos[node->name()]);
+    name2layer_[node->name()]=layer;
+    layers_.push_back(layer);
+  }
+
+  // connect Layers.
+  for(SNode node: graph_.nodes()){
+    auto layer=name2layer_[node->name()];
+    for(SNode dst: node->dstnodes())
+      layer->AddDstLayer(name2layer_[dst->name()]);
+    for(SNode src: node->srcnodes())
+      layer->AddSrcLayer(name2layer_[src->name()]);
+  }
+  // setup layer properties, e.g., shapes
+  for(auto& layer: layers_){
+      layer->Setup();
+  }
+  LOG(INFO)<<"network graph witout partition\n"<<ToString();
+}
+
+void NeuralNet::PartitionNeuralNet(){
+  graph_=CreatePartitonedGraph(layers_, name2layer_);
+  //DLOG(ERROR)<<"pure graph after partition\n"<<graph_.ToString();
+  map<string, shared_ptr<Layer>> name2layer(name2layer_);
+  name2layer_.clear();
+  layers_.clear();
+  int gsize=group_size_;
+  auto* factory=Singleton<Factory<Layer>>::Instance();
+  // create Layers according to topology order
+  for(SNode node: graph_.nodes()){
+    LayerProto proto;
+    proto.set_name(node->name());
+    proto.set_locationid(node->val().locationid);
+    proto.set_partitionid(node->val().partitionid);
+    const string& origin=node->val().origin;
+    if (origin=="kSlice"){
+      proto.set_type(origin);
+      SliceProto *slice=proto.mutable_slice_param();
+      slice->set_slice_dimension(node->val().slice_dimension);
+      slice->set_slice_num(node->dstnodes().size());
+    }else if(origin== "kConcate"){
+      proto.set_type(origin);
+      ConcateProto *concate=proto.mutable_concate_param();
+      concate->set_concate_dimension(node->val().concate_dimension);
+      concate->set_concate_num(node->srcnodes().size());
+    }else if(origin=="kSplit"){
+      proto.set_type(origin);
+      SplitProto *split=proto.mutable_split_param();
+      split->set_num_splits(node->dstnodes().size());
+    }else if(origin=="kBridgeSrc" || origin== "kBridgeDst"){
+      proto.set_type(origin);
+    }else{
+      CHECK(name2layer.find(node->val().origin)!=name2layer_.end())
+        <<"Unkown origin for node "<<node->val().origin;
+    }
+    shared_ptr<Layer> newlayer;
+    if(proto.has_type()){
+      // layers added due to partition
+      shared_ptr<Layer> layer(factory->Create(proto.type()));
+      layer->Init(proto);
+      newlayer=layer;
+    }else{
+      // partitioned layers from origin neuralnet
+      auto oldlayer=name2layer.at(node->val().origin);
+      vector<int> shape=oldlayer->shape(nullptr);
+      if(oldlayer->partition_type()==kNone){
+        newlayer=oldlayer;
+      } else{
+        int pdim=oldlayer->partition_dimension();
+        shape[pdim]=shape[pdim]/gsize+
+          ((node->val().partitionid==gsize-1)?shape[pdim]%gsize:0);
+        shared_ptr<Layer> layer(factory->Create(oldlayer->type()));
+        layer->Init(*oldlayer, shape);
+        layer->set_name(node->name());
+        newlayer=layer;
+      }
+    }
+    layers_.push_back(newlayer);
+    name2layer_[node->name()]=newlayer;
+  }
+
+  // connect Layers.
+  for(SNode node: graph_.nodes()){
+    auto layer=name2layer_[node->name()];
+    layer->ClearDstLayers();
+    for(SNode dst: node->dstnodes())
+      layer->AddDstLayer(name2layer_[dst->name()]);
+    layer->ClearSrcLayers();
+    for(SNode src: node->srcnodes())
+      layer->AddSrcLayer(name2layer_[src->name()]);
+  }
+
+  LOG(INFO)<<"Adjacency matrix\n"<<ToAdjacency();
+
+  // set up layers after
+  for(shared_ptr<Layer> layer: layers_){
+    const vector<int>& shape=layer->shape(nullptr);
+    layer->SetupAfterPartition();
+    const vector<int>& newshape=layer->shape(nullptr);
+    if(shape.size())
+      CHECK(std::equal(shape.begin(),shape.end(),newshape.begin()));
+  }
+
+  LOG(INFO)<<"network graph after partition layers\n"<<ToString();
+}
+
+Graph NeuralNet::CreatePartitonedGraph(const vector<shared_ptr<Layer>>& layers,
+    const map<string, shared_ptr<Layer>>& name2layer){
+  Graph graph;
+  // partition origin nodes/layers
+  map<string, vector<SNode>> layer2nodes; //from name of original layer to nodes
+  int gsize=group_size_;
+  for(const auto& layer: layers){
+    vector<SNode> nodes;
+    if(layer->partition_type()==kDataPartition||
+        layer->partition_type()==kLayerPartition){
+      char suffix[4];
+      for(int i=0;i<gsize;i++){
+        sprintf(suffix, "%02d", i);
+        // differentiate partitions
+        string nodename=layer->name()+"-"+string(suffix);
+        LayerInfo info;
+        auto node=graph.AddNode(nodename, LayerInfo{layer->name(),i, i,-1,-1});
+        nodes.push_back(node);
+      }
+    }else if(layer->partition_type()==kNone){
+      auto node=graph.AddNode(layer->name(),
+          LayerInfo{layer->name(), layer->locationid(), 0,-1,-1});
+      nodes.push_back(node);
+    }else{
+      LOG(FATAL)<<"Unknown partition type "<<layer->partition_type();
+    }
+    layer2nodes[layer->name()]=nodes;
+  }
+
+
+  // connect nodes, nodes for ConcateLayer and SliceLayer are added.
+  for(shared_ptr<Layer> layer: layers){
+    string name=layer->name();
+    PartitionType type=layer->partition_type();
+    const vector<SNode>& nodes=layer2nodes.at(name);
+    for(int srcid=0;srcid<layer->srclayers_size();srcid++){
+      shared_ptr<Layer> srclayer=layer->srclayers()[srcid];
+      string srcname=srclayer->name();
+      const vector<SNode> srcnodes=layer2nodes.at(srcname);
+      PartitionType srctype=srclayer->partition_type();
+      ConnectionType connection=layer->connection_type(srcid);
+      if(srctype==kNone){
+        CHECK_EQ(srcnodes.size(),1)
+          <<"local layer "<<srcname<<" should not be partitioned";
+        SNode srcnode=srcnodes[0];
+        if(type==kDataPartition||(type==kLayerPartition&&connection==kOneToOne)){
+          LayerInfo info=srcnode->val();
+          info.slice_dimension=name2layer.at(name)->partition_dimension();
+          graph.InsertSliceNode(srcnode, nodes, info);
+        } else if(type==kNone){
+          CHECK_EQ(nodes.size(),1)
+            <<"local layer "<<name<<" should not be nodeed";
+          graph.AddEdge(srcnode, nodes[0]);
+        } else { // type==kLayerPartition&&connection==kOneToAll
+          graph.InsertSplitNode(srcnode, nodes);
+        }
+      }else if((type==kNone
+                &&(srctype==kDataPartition||srctype==kLayerPartition))
+               ||(type==kLayerPartition&&connection==kOneToAll&&
+                  (srctype==kDataPartition||srctype==kLayerPartition))){
+        // copy/concate the whole srclayer for every dst partition
+        for(SNode node:nodes){
+          LayerInfo info=node->val();
+          info.concate_dimension=name2layer.at(srcname)->partition_dimension();
+          CHECK_GE(info.concate_dimension,0);
+          graph.InsertConcateNode(srcnodes, node, info);
+        }
+      }else if((srctype==kLayerPartition&&type==kDataPartition)
+          || (srctype==kDataPartition&&type==kLayerPartition)){
+        // the most complext scenario
+        vector<SNode> slicenodes;
+        for(SNode srcnode: srcnodes){
+          LayerInfo info=srcnode->val();
+          info.slice_dimension=name2layer.at(name)->partition_dimension();
+          slicenodes.push_back(graph.InsertSliceNode(srcnode, nodes,
+              info, false));
+        }
+        for(SNode node: nodes){
+          LayerInfo info=node->val();
+          info.concate_dimension=name2layer.at(srcname)->partition_dimension();
+          CHECK_GE(info.concate_dimension,0);
+          graph.InsertConcateNode(slicenodes, node, info);
+        }
+      }else if((srctype==kDataPartition&&type==kDataPartition)||
+          (srctype==kLayerPartition&&type==kLayerPartition&&
+           layer->connection_type(srcid)==kOneToOne)){
+        CHECK_EQ(srcnodes.size(), nodes.size());
+        for(size_t i=0;i<srcnodes.size();i++){
+          graph.AddEdge(srcnodes[i], nodes[i]);
+        }
+      }
+    }
+  }
+  // must do topology sort, because we have added new nodes.
+  graph.Sort();
+  //LOG(ERROR)<<graph.ToString();
+
+  // add node for split layer
+  bool data_node=true;
+  vector<SNode> oldnodes=graph.nodes();
+  for(SNode node: oldnodes){
+    if(node->dstnodes_size()>1&&node->val().origin!="kSlice"
+        &&node->val().origin!="kSplit"&&!data_node){
+      vector<SNode> dstnodes=node->dstnodes();
+      for(SNode dst: dstnodes)
+        graph.RemoveEdge(node, dst);
+      graph.InsertSplitNode(node, dstnodes);
+    }
+    data_node=false;
+  }
+
+  // add bridge
+  oldnodes=graph.nodes();
+  for(SNode node: oldnodes){
+    vector<SNode> dstnodes=node->dstnodes();
+    for(size_t i=0;i<dstnodes.size();i++){
+      SNode dstnode=dstnodes.at(i);
+      if(node->val().locationid!=dstnode->val().locationid){
+        graph.RemoveEdge(node, dstnode);
+        graph.InsertBridgeNode(node, dstnode);
+      }
+    }
+  }
+  graph.Sort();
+  return graph;
+}
+
+std::string NeuralNet::ToString(){
+  map<string, string> info;
+  for(auto layer: layers_){
+    info[layer->name()]=IntVecToString(layer->shape(nullptr));
+    string type=layer->type();
+  }
+  return graph_.ToString(info);
+}
+
+std::string NeuralNet::ToAdjacency(){
+  string disp="";
+  for(auto& layer: layers_){
+    disp+=layer->name()+": ";
+    for(const auto& dst: layer->dstlayers())
+      disp+=dst->name()+", ";
+    disp+="\n";
+  }
+  return disp;
+}
+
+
+void NeuralNet::ToProto(NetProto *proto, bool copyData) {
+  proto->clear_layer();
+}
+
+string NeuralNet::DebugInfo(){
+  string ret;
+  char display[4096];
+  for(auto& layer: layers_){
+    if(!layer->is_datalayer()){
+      sprintf(display, "Forward layer  %10s data norm1 %13.9f\n",
+          layer->name().c_str(), layer->data().asum_data());
+      ret+=string(display);
+    }
+  }
+  for (auto it = layers_.rbegin(); it != layers_.rend(); it++){
+    shared_ptr<Layer> layer=*it;
+    if(!(layer->is_datalayer()||layer->is_losslayer()||layer->is_parserlayer())){
+      sprintf(display, "Backward layer %10s grad norm1 %13.9f\n",
+          layer->name().c_str(), layer->grad().asum_data());
+      ret+=string(display);
+    }
+  }
+  for(auto& layer: layers_){
+    for(auto param: layer->GetParams()){
+      sprintf(display, "Layer %10s, param id %2d, name %10s,\
+          value norm1 %13.9f, grad norm1 %13.9f\n",
+          layer->name().c_str(), param->id(), param->name().c_str(),
+          param->data().asum_data(), param->grad().asum_data());
+      ret+=string(display);
+    }
+  }
+  return ret;
+}
+void NeuralNet::ShareParams(shared_ptr<NeuralNet> other, int flag){
+  for(auto& layer: layers_){
+    auto otherlayer=other->name2layer(layer->name());
+    if(otherlayer!=nullptr){
+      const auto& otherparams=otherlayer->GetParams();
+      const auto& params=layer->GetParams();
+      CHECK_EQ(params.size(), otherparams.size());
+      for(size_t i=0;i<params.size();i++){
+        params[i]->ShareData(otherparams[i]);
+      }
+    }
+  }
+}
+
+}  // namespace singa


Mime
View raw message