singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wang...@apache.org
Subject [4/4] incubator-singa git commit: SINGA-21 Code Review
Date Sat, 20 Jun 2015 14:46:34 GMT
SINGA-21 Code Review

clean msg.h msg.cc
  -- remove BaseMsg interface in msg.h
  -- move zmq-related implementation to msg.cc
  -- formatting


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/767bad29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/767bad29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/767bad29

Branch: refs/heads/master
Commit: 767bad29e4362e14341dfdf4e921c07591f88c75
Parents: 0f3a8ff
Author: wang sheng <wangsheng1001@gmail.com>
Authored: Sat Jun 20 19:23:41 2015 +0800
Committer: wang sheng <wangsheng1001@gmail.com>
Committed: Sat Jun 20 19:23:41 2015 +0800

----------------------------------------------------------------------
 include/communication/msg.h    | 244 +++++++++++-------------------------
 include/communication/socket.h |   6 +-
 src/communication/msg.cc       |  47 ++++++-
 src/communication/socket.cc    |   4 +-
 4 files changed, 126 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/767bad29/include/communication/msg.h
----------------------------------------------------------------------
diff --git a/include/communication/msg.h b/include/communication/msg.h
index 61cbd01..ba7d064 100644
--- a/include/communication/msg.h
+++ b/include/communication/msg.h
@@ -1,196 +1,102 @@
-#ifndef INCLUDE_COMMUNICATION_MSG_H_
-#define INCLUDE_COMMUNICATION_MSG_H_
-#include <string>
+#ifndef SINGA_COMMUNICATION_MSG_H_
+#define SINGA_COMMUNICATION_MSG_H_
+
 #include <czmq.h>
 #include <glog/logging.h>
+#include <algorithm>
+#include <string>
 
-using std::string;
 namespace singa {
-class BaseMsg{
-  public:
-  /**
-    * Destructor to free memory
-    */
-  virtual ~BaseMsg(){};
+
+#define USE_ZMQ
+
+class Msg {
+ public:
+  Msg();
+  ~Msg();
+
   /**
     * @param first worker/server group id
-    * @param id worker/server id within the group
+    * @param second worker/server id within the group
     * @param flag 0 for server, 1 for worker, 2 for stub
     */
-  virtual void set_src(int first, int second, int flag)=0;
-  virtual void set_dst(int first, int second, int flag)=0;
-  virtual void set_src(int procs_id, int flag)=0;
-  virtual void set_dst(int procs_id, int flag)=0;
-  virtual int src_first() const=0;
-  virtual int dst_first() const=0;
-  virtual int src_second() const=0;
-  virtual int dst_second() const=0;
-  virtual int src_flag() const=0;
-  virtual int dst_flag() const=0;
-  virtual void set_type(int type)=0;
-  virtual int type() const=0;
-  virtual void set_target(int first, int second)=0;
-  virtual int target_first() const=0;
-  virtual int target_second() const=0;
-
+  inline void set_src(int first, int second, int flag) {
+    src_ = (first << kOff1) | (second << kOff2) | flag;
+  }
+  inline void set_dst(int first, int second, int flag) {
+    dst_ = (first << kOff1) | (second << kOff2) | flag;
+  }
+  inline void set_src(int procs_id, int flag) { set_src(procs_id, 0, flag); }
+  inline void set_dst(int procs_id, int flag) { set_dst(procs_id, 0, flag); }
+  inline int src() const { return src_; }
+  inline int dst() const { return dst_; }
+  inline int src_first() const { return src_ >> kOff1; }
+  inline int dst_first() const { return dst_ >> kOff1; }
+  inline int src_second() const { return (src_ & kMask1) >> kOff2; }
+  inline int dst_second() const { return (dst_ & kMask1) >> kOff2; }
+  inline int src_flag() const { return src_&kMask2; }
+  inline int dst_flag() const { return dst_&kMask2; }
+  inline void SwapAddr() { std::swap(src_, dst_); }
+  inline void set_type(int type) { type_ = type; }
+  inline int type() const { return type_; }
+  inline void set_target(int first, int second) {
+    target_first_ = first;
+    target_second_ = second;
+  }
+  inline int target_first() const { return target_first_; }
+  inline int target_second() const { return target_second_; }
   /**
    * Copy src and dst address, including first, id, flag
    */
-  virtual BaseMsg* CopyAddr()=0;
-  virtual void SetAddr(BaseMsg* msg)=0;
+  inline Msg* CopyAddr() {
+    Msg* msg = new Msg();
+    msg->src_ = src_;
+    msg->dst_ = dst_;
+    return msg;
+  }
+  inline void SetAddr(Msg* msg) {
+    src_ = msg->src_;
+    dst_ = msg->dst_;
+  }
 
   /**
    * Add a frame (a chunck of bytes) into the message
    */
-  virtual void add_frame(const void*, int nBytes)=0;
-  virtual int frame_size()=0;
-  virtual void* frame_data()=0;
+  void add_frame(const void* addr, int nBytes);
+  int frame_size();
+  void* frame_data();
   /**
     * Move the cursor to the next frame
     * @return true if the next frame is not NULL; otherwise false
     */
-  virtual bool next_frame()=0;
-};
-// TODO make it a compiler argument
-#define USE_ZMQ
-
+  bool next_frame();
 #ifdef USE_ZMQ
-class Msg : public BaseMsg{
- public:
-  Msg() {
-    msg_=zmsg_new();
-  }
-  virtual ~Msg(){
-    if(msg_!=NULL)
-      zmsg_destroy(&msg_);
-  }
-  virtual void set_src(int first, int second, int flag){
-    src_=(first<<kOff1)|(second<<kOff2)|flag;
-  }
-  virtual void set_dst(int first, int second, int flag){
-    dst_=(first<<kOff1)|(second<<kOff2)|flag;
-  }
-  virtual void set_src(int procs_id, int flag){
-    set_src(procs_id, 0, flag);
-  }
-  virtual void set_dst(int procs_id, int flag){
-    set_dst(procs_id, 0, flag);
-  }
-  int src() const {
-    return src_;
-  }
-  int dst() const {
-    return dst_;
-  }
-  virtual int src_first() const {
-    int ret=src_>>kOff1;
-    return ret;
-  }
-
-  virtual int dst_first() const{
-    int ret=dst_>>kOff1;
-    return ret;
-  }
-  virtual int src_second() const{
-    int ret=(src_&kMask1)>>kOff2;
-    return ret;
-  }
-  virtual int dst_second() const{
-    int ret=(dst_&kMask1)>>kOff2;
-    return ret;
-  }
-  virtual int src_flag() const{
-    int ret=src_&kMask2;
-    return ret;
-  }
-  virtual int dst_flag() const{
-    int ret=dst_&kMask2;
-    return ret;
-  }
-
-  void SwapAddr(){
-    std::swap(src_,dst_);
-  }
-
-  virtual void set_type(int type){
-    type_=type;
-  }
-  virtual int type() const{
-    return type_;
-  }
-
-  virtual void set_target(int first, int second){
-    target_first_=first;
-    target_second_=second;
-  }
-  virtual int target_first() const{
-    return target_first_;
-  }
-  virtual int target_second() const{
-    return target_second_;
-  }
-
- virtual BaseMsg* CopyAddr(){
-    Msg* msg=new Msg();
-    msg->src_=src_;
-    msg->dst_=dst_;
-    return msg;
-  }
-
-  virtual void SetAddr(BaseMsg* msg){
-    src_=(static_cast<Msg*>(msg))->src_;
-    dst_=(static_cast<Msg*>(msg))->dst_;
-  }
-
-  virtual void add_frame(const void* addr, int nBytes){
-    zmsg_addmem(msg_, addr, nBytes);
-  }
-  virtual int frame_size(){
-    return zframe_size(frame_);
-  }
-
-  virtual void* frame_data(){
-    return zframe_data(frame_);
-  }
-
-  virtual bool next_frame(){
-    frame_=zmsg_next(msg_);
-    return frame_!=NULL;
-  }
-
-  void ParseFromZmsg(zmsg_t* msg){
-    char* tmp=zmsg_popstr(msg);
-    sscanf(tmp, "%d %d %d %d %d",
-        &src_, &dst_, &type_, &target_first_, &target_second_);
-    //LOG(ERROR)<<"recv "<<src_<<" "<<dst_<<" "<<target_;
-    frame_=zmsg_next(msg);
-    msg_=msg;
-  }
-
-  zmsg_t* DumpToZmsg(){
-    zmsg_pushstrf(msg_, "%d %d %d %d %d",
-        src_, dst_, type_, target_first_, target_second_);
-    //LOG(ERROR)<<"send "<<src_<<" "<<dst_<<" "<<target_;
-    zmsg_t *tmp=msg_;
-    msg_=NULL;
-    return tmp;
-  }
+  void ParseFromZmsg(zmsg_t* msg);
+  zmsg_t* DumpToZmsg();
+#endif
 
  protected:
-  static const unsigned int kOff1=16, kOff2=4;
-  static const unsigned int kMask1=(1<<kOff1)-1, kMask2=(1<<kOff2)-1;
-  int src_, dst_;
-  int type_, target_first_, target_second_;
-  zmsg_t* msg_;
-  zframe_t *frame_;
-};
+  static const unsigned int kOff1 = 16;
+  static const unsigned int kOff2 = 4;
+  static const unsigned int kMask1 = (1 << kOff1) - 1;
+  static const unsigned int kMask2 = (1 << kOff2) - 1;
+
+  int src_ = 0;
+  int dst_ = 0;
+  int type_ = 0;
+  int target_first_ = 0;
+  int target_second_ = 0;
+#ifdef USE_ZMQ
+  zmsg_t* msg_ = nullptr;
+  zframe_t *frame_ = nullptr;
 #endif
-inline void DeleteMsg(Msg** msg){
+};
+
+inline void DeleteMsg(Msg** msg) {
   delete *msg;
-  *msg=nullptr;
+  *msg = nullptr;
 }
 
+}  // namespace singa
 
-} /* singa */
-
-#endif // INCLUDE_COMMUNICATION_MSG_H_
+#endif  // SINGA_COMMUNICATION_MSG_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/767bad29/include/communication/socket.h
----------------------------------------------------------------------
diff --git a/include/communication/socket.h b/include/communication/socket.h
index 4b1f467..09c71e3 100644
--- a/include/communication/socket.h
+++ b/include/communication/socket.h
@@ -5,7 +5,7 @@
 #include "communication/msg.h"
 namespace singa {
 
-const string kInprocRouterEndpoint="inproc://router";
+const std::string kInprocRouterEndpoint="inproc://router";
 class Socket{
   public:
   Socket(){}
@@ -83,7 +83,7 @@ class Dealer : public Socket{
     * format, i.e., IP:port, where IP is the connected process.
     * @return 1 connection sets up successfully; 0 otherwise
     */
-  virtual int Connect(string endpoint);
+  virtual int Connect(std::string endpoint);
   virtual int Send(Msg** msg);
   virtual Msg* Receive();
   virtual void* InternalID() const{
@@ -119,7 +119,7 @@ class Router : public Socket{
   * intra-process connection.
   * @return number of connected dealers.
   */
-  virtual int Bind(string endpoint);
+  virtual int Bind(std::string endpoint);
  /**
    * If the destination socket has not connected yet, buffer this the message.
    */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/767bad29/src/communication/msg.cc
----------------------------------------------------------------------
diff --git a/src/communication/msg.cc b/src/communication/msg.cc
index 80f2304..2a22b05 100644
--- a/src/communication/msg.cc
+++ b/src/communication/msg.cc
@@ -1,5 +1,50 @@
 #include "communication/msg.h"
 
 namespace singa {
-} /* singa */
+
+#ifdef USE_ZMQ
+Msg::Msg() {
+  msg_ = zmsg_new();
+}
+
+Msg::~Msg() {
+  if (msg_ != nullptr)
+    zmsg_destroy(&msg_);
+}
+
+void Msg::add_frame(const void* addr, int nBytes) {
+  zmsg_addmem(msg_, addr, nBytes);
+}
+
+int Msg::frame_size() {
+  return zframe_size(frame_);
+}
+
+void* Msg::frame_data() {
+  return zframe_data(frame_);
+}
+
+bool Msg::next_frame() {
+  frame_ = zmsg_next(msg_);
+  return frame_ != nullptr;
+}
+
+void Msg::ParseFromZmsg(zmsg_t* msg) {
+  char* tmp = zmsg_popstr(msg);
+  sscanf(tmp, "%d %d %d %d %d",
+         &src_, &dst_, &type_, &target_first_, &target_second_);
+  frame_ = zmsg_next(msg);
+  msg_ = msg;
+}
+
+zmsg_t* Msg::DumpToZmsg() {
+  zmsg_pushstrf(msg_, "%d %d %d %d %d",
+                src_, dst_, type_, target_first_, target_second_);
+  zmsg_t *tmp = msg_;
+  msg_ = nullptr;
+  return tmp;
+}
+#endif
+
+}  // namespace singa
 

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/767bad29/src/communication/socket.cc
----------------------------------------------------------------------
diff --git a/src/communication/socket.cc b/src/communication/socket.cc
index 385950b..30dff5d 100644
--- a/src/communication/socket.cc
+++ b/src/communication/socket.cc
@@ -24,7 +24,7 @@ Dealer::Dealer(int id):id_(id){
   poller_=zpoller_new(dealer_);
 }
 
-int Dealer::Connect(string endpoint){
+int Dealer::Connect(std::string endpoint){
   if(endpoint.length())
     CHECK_EQ(zsock_connect(dealer_,"%s", endpoint.c_str()),0);
   return 1;
@@ -56,7 +56,7 @@ Router::Router(int bufsize){
   CHECK_NOTNULL(router_);
   poller_=zpoller_new(router_);
 }
-int Router::Bind(string endpoint){
+int Router::Bind(std::string endpoint){
   if(endpoint.length())
     CHECK_EQ(zsock_bind(router_, "%s", endpoint.c_str()),0);
   return 1;


Mime
View raw message