singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wang...@apache.org
Subject [01/18] incubator-singa git commit: SINGA-21 Code review
Date Wed, 24 Jun 2015 13:35:44 GMT
Repository: incubator-singa
Updated Branches:
  refs/heads/master 56d32e8a0 -> 7d39f8813


SINGA-21 Code review

review socket.h and socket.cc
  -- remove BasePoll class
  -- rename Socket to SocketInterface
  -- refine functions: Dealer.Connect, Router.Bind
  -- formatting


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/b2d7332c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/b2d7332c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/b2d7332c

Branch: refs/heads/master
Commit: b2d7332c62c8ec8531c1190babb85873fd3f8aac
Parents: b0483f2
Author: wang sheng <wangsheng1001@gmail.com>
Authored: Mon Jun 22 16:16:36 2015 +0800
Committer: wang wei <wangwei@comp.nus.edu.sg>
Committed: Wed Jun 24 16:57:59 2015 +0800

----------------------------------------------------------------------
 include/communication/msg.h    |  12 +--
 include/communication/socket.h | 161 +++++++++++++++++++-----------------
 src/communication/socket.cc    | 159 ++++++++++++++++++++---------------
 3 files changed, 184 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2d7332c/include/communication/msg.h
----------------------------------------------------------------------
diff --git a/include/communication/msg.h b/include/communication/msg.h
index e63c3cf..6ff887f 100644
--- a/include/communication/msg.h
+++ b/include/communication/msg.h
@@ -1,14 +1,17 @@
 #ifndef SINGA_COMMUNICATION_MSG_H_
 #define SINGA_COMMUNICATION_MSG_H_
 
-#include <czmq.h>
-#include <glog/logging.h>
+// TODO(wangwei): make it a compiler argument
+#define USE_ZMQ
+
 #include <algorithm>
 #include <string>
 
-namespace singa {
+#ifdef USE_ZMQ
+#include <czmq.h>
+#endif
 
-#define USE_ZMQ
+namespace singa {
 
 class Msg {
  public:
@@ -60,7 +63,6 @@ class Msg {
     src_ = msg->src_;
     dst_ = msg->dst_;
   }
-
   /**
    * Add a frame (a chunck of bytes) into the message
    */

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2d7332c/include/communication/socket.h
----------------------------------------------------------------------
diff --git a/include/communication/socket.h b/include/communication/socket.h
index 09c71e3..77c701a 100644
--- a/include/communication/socket.h
+++ b/include/communication/socket.h
@@ -1,81 +1,82 @@
-#ifndef INCLUDE_COMMUNICATION_SOCKET_H_
-#define INCLUDE_COMMUNICATION_SOCKET_H_
+#ifndef SINGA_COMMUNICATION_SOCKET_H_
+#define SINGA_COMMUNICATION_SOCKET_H_
+
 #include <map>
+#include <string>
 #include <vector>
+
 #include "communication/msg.h"
+
+#ifdef USE_ZMQ
+#include <czmq.h>
+#endif
+
 namespace singa {
 
-const std::string kInprocRouterEndpoint="inproc://router";
-class Socket{
-  public:
-  Socket(){}
-  virtual ~Socket(){}
+const char kInprocRouterEndpoint[] = "inproc://router";
+
+class SocketInterface {
+ public:
+  virtual ~SocketInterface() {}
   /**
-    * Send a message to connected socket(s), non-blocking. The message will
-    * be deallocated after sending, thus should not be used after calling Send();
-    * @param  the message to be sent
+    * Send a message to connected socket(s), non-blocking. The message 
+    * will be deallocated after sending, thus should not be used after 
+    * calling Send();
+    * 
+    * @param msg The message to be sent
     * @return 1 for success queuing the message for sending, 0 for failure
     */
-  virtual int Send(Msg** msg)=0;
+  virtual int Send(Msg** msg) = 0;
   /**
     * Receive a message from any connected socket.
     *
     * @return a message pointer if success; nullptr if failure
     */
-  virtual Msg* Receive()=0;
+  virtual Msg* Receive() = 0;
   /**
    * @return Identifier of the implementation dependent socket. E.g., zsock_t*
    * for ZeroMQ implementation and rank for MPI implementation.
    */
-  virtual void* InternalID() const=0;
-
- protected:
-  int local_id_;
+  virtual void* InternalID() const = 0;
 };
 
-class BasePoller{
+class Poller {
  public:
+  Poller();
   /**
     * Add a socket for polling; Multiple sockets can be polled together by
     * adding them into the same poller.
     */
-  virtual void Add(Socket* socket)=0;
+  void Add(SocketInterface* socket);
   /**
     * Poll for all sockets added into this poller.
-    * @param timeout stop after this number of mseconds
-    * @return pointer to the socket if it has one message in the receiving
+    * @param timeout Stop after this number of mseconds
+    * @return pointer To the socket if it has one message in the receiving
     * queue; nullptr if no message in any sockets,
     */
-  virtual Socket* Wait(int timeout)=0;
-};
+  SocketInterface* Wait(int duration);
 
-#define USE_ZMQ
-#include <czmq.h>
-
-#ifdef USE_ZMQ
-class Poller: public BasePoller{
- public:
-  Poller();
-  virtual void Add(Socket* socket);
-  virtual Socket* Wait(int duration);
  protected:
+#ifdef USE_ZMQ
   zpoller_t *poller_;
-  std::map<zsock_t*, Socket*> zsock2Socket_;
+  std::map<zsock_t*, SocketInterface*> zsock2Socket_;
+#endif
 };
 
-class Dealer : public Socket{
+class Dealer : public SocketInterface {
  public:
   /*
-   * @param id local dealer ID within a procs if the dealer is from worker or
+   * @param id Local dealer ID within a procs if the dealer is from worker or
    * server thread, starts from 1 (0 is used by the router); or the connected
    * remote procs ID for inter-process dealers from the stub thread.
    */
-  Dealer(int id=-1);
-  virtual ~Dealer();
+  Dealer();
+  explicit Dealer(int id);
+  ~Dealer() override;
   /**
     * Setup the connection with the router.
     *
-    * @param endpoint identifier of the router. For intra-process
+    * @param endpoint Identifier of the router. For intra-process
     * connection, the endpoint follows the format of ZeroMQ, i.e.,
     * starting with "inproc://"; in Singa, since each process has one
     * router, hence we can fix the endpoint to be "inproc://router" for
@@ -83,62 +84,66 @@ class Dealer : public Socket{
     * format, i.e., IP:port, where IP is the connected process.
     * @return 1 connection sets up successfully; 0 otherwise
     */
-  virtual int Connect(std::string endpoint);
-  virtual int Send(Msg** msg);
-  virtual Msg* Receive();
-  virtual void* InternalID() const{
-    return dealer_;
-  }
+  int Connect(const std::string& endpoint);
+  int Send(Msg** msg) override;
+  Msg* Receive() override;
+  void* InternalID() const override;
+
  protected:
-  int id_;
-  zsock_t* dealer_;
-  zpoller_t* poller_;
+  int id_ = -1;
+#ifdef USE_ZMQ
+  zsock_t* dealer_ = nullptr;
+  zpoller_t* poller_ = nullptr;
+#endif
 };
 
-class Router : public Socket{
+class Router : public SocketInterface {
  public:
-  virtual ~Router();
+  Router();
   /**
-   * Constructor.
-   *
    * There is only one router per procs, hence its local id is 0 and is not set
    * explicitly.
    *
-   * @param bufsize buffer at most this number of messages
+   * @param bufsize Buffer at most this number of messages
+   */
+  explicit Router(int bufsize);
+  ~Router() override;
+  /**
+   * Setup the connection with dealers.
+   *
+   * It automatically binds to the endpoint for intra-process communication,
+   * i.e., "inproc://router".
+   *
+   * @param endpoint The identifier for the Dealer socket in other process
+   * to connect. It has the format IP:Port, where IP is the host machine.
+   * If endpoint is empty, it means that all connections are
+   * intra-process connection.
+   * @return number of connected dealers.
    */
-  Router(int bufsize=100);
- /**
-  * Setup the connection with dealers.
-  *
-  * It automatically binds to the endpoint for intra-process communication,
-  * i.e., "inproc://router".
-  *
-  * @param endpoint the identifier for the Dealer socket in other process
-  * to connect. It has the format IP:Port, where IP is the host machine.
-  * If endpoint is empty, it means that all connections are
-  * intra-process connection.
-  * @return number of connected dealers.
-  */
-  virtual int Bind(std::string endpoint);
- /**
+  int Bind(const std::string& endpoint);
+  /**
    * If the destination socket has not connected yet, buffer this the message.
    */
-  virtual int Send(Msg** msg);
-  virtual Msg* Receive();
-  virtual void* InternalID() const{
-    return router_;
-  }
+  int Send(Msg** msg) override;
+  Msg* Receive() override;
+  void* InternalID() const override;
+
  protected:
-  zsock_t* router_;
-  zpoller_t* poller_;
+  int nBufmsg_ = 0;
+  int bufsize_ = 100;
+#ifdef USE_ZMQ
+  zsock_t* router_ = nullptr;
+  zpoller_t* poller_ = nullptr;
   std::map<int, zframe_t*> id2addr_;
   std::map<int, std::vector<zmsg_t*>> bufmsg_;
-  int nBufmsg_, bufsize_;
+#endif
 };
 
-#elif USE_MPI
-vector<shared_ptr<SafeQueue>> MPIQueues;
+#ifdef USE_MPI
+// TODO(wangsheng): add intra-process communication using shared queue
+std::vector<SafeQueue*> MPIQueues;
 #endif
-} /* singa */
 
-#endif // INCLUDE_COMMUNICATION_SOCKET_H_
+}  // namespace singa
+
+#endif  // SINGA_COMMUNICATION_SOCKET_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2d7332c/src/communication/socket.cc
----------------------------------------------------------------------
diff --git a/src/communication/socket.cc b/src/communication/socket.cc
index 30dff5d..5321724 100644
--- a/src/communication/socket.cc
+++ b/src/communication/socket.cc
@@ -1,120 +1,149 @@
 #include "communication/socket.h"
 
+#include <glog/logging.h>
+
 namespace singa {
-Poller::Poller(){
-  poller_=zpoller_new(NULL);
+
+#ifdef USE_ZMQ
+Poller::Poller() {
+  poller_ = zpoller_new(nullptr);
 }
 
-void Poller::Add(Socket* socket){
-  zsock_t* zsock=static_cast<zsock_t*>(socket->InternalID());
+void Poller::Add(SocketInterface* socket) {
+  zsock_t* zsock = static_cast<zsock_t*>(socket->InternalID());
   zpoller_add(poller_, zsock);
-  zsock2Socket_[zsock]=socket;
+  zsock2Socket_[zsock] = socket;
 }
 
-Socket* Poller::Wait(int timeout){
-  zsock_t* sock=(zsock_t*)zpoller_wait(poller_, timeout);
-  if(sock!=NULL)
+SocketInterface* Poller::Wait(int timeout) {
+  zsock_t* sock = static_cast<zsock_t*>(zpoller_wait(poller_, timeout));
+  if (sock != nullptr)
     return zsock2Socket_[sock];
-  else return nullptr;
+  return nullptr;
 }
 
-Dealer::Dealer(int id):id_(id){
-  dealer_=zsock_new(ZMQ_DEALER);
+Dealer::Dealer() : Dealer(-1) {}
+
+Dealer::Dealer(int id) : id_(id) {
+  dealer_ = zsock_new(ZMQ_DEALER);
   CHECK_NOTNULL(dealer_);
-  poller_=zpoller_new(dealer_);
+  poller_ = zpoller_new(dealer_);
+  CHECK_NOTNULL(poller_);
 }
 
-int Dealer::Connect(std::string endpoint){
-  if(endpoint.length())
-    CHECK_EQ(zsock_connect(dealer_,"%s", endpoint.c_str()),0);
-  return 1;
+Dealer::~Dealer() {
+  zsock_destroy(&dealer_);
+}
+
+int Dealer::Connect(const std::string& endpoint) {
+  CHECK_GT(endpoint.length(), 0);
+  if (endpoint.length()) {
+    CHECK_EQ(zsock_connect(dealer_, "%s", endpoint.c_str()), 0);
+    return 1;
+  }
+  return 0;
 }
-int Dealer::Send(Msg** msg){
-  zmsg_t* zmsg=(*msg)->DumpToZmsg();
+
+int Dealer::Send(Msg** msg) {
+  zmsg_t* zmsg = (*msg)->DumpToZmsg();
   zmsg_send(&zmsg, dealer_);
   delete *msg;
-  *msg=NULL;
+  *msg = nullptr;
   return 1;
 }
 
-Msg* Dealer::Receive(){
-  zmsg_t* zmsg=zmsg_recv(dealer_);
-  if(zmsg==NULL)
+Msg* Dealer::Receive() {
+  zmsg_t* zmsg = zmsg_recv(dealer_);
+  if (zmsg == nullptr)
     return nullptr;
-  Msg* msg=new Msg();
+  Msg* msg = new Msg();
   msg->ParseFromZmsg(zmsg);
   return msg;
 }
-Dealer::~Dealer(){
-  zsock_destroy(&dealer_);
+
+void* Dealer::InternalID() const {
+  return dealer_;
 }
 
-Router::Router(int bufsize){
-  nBufmsg_=0;
-  bufsize_=bufsize;
-  router_=zsock_new(ZMQ_ROUTER);
+Router::Router() : Router(100) {}
+
+Router::Router(int bufsize) {
+  nBufmsg_ = 0;
+  bufsize_ = bufsize;
+  router_ = zsock_new(ZMQ_ROUTER);
   CHECK_NOTNULL(router_);
-  poller_=zpoller_new(router_);
+  poller_ = zpoller_new(router_);
+  CHECK_NOTNULL(poller_);
 }
-int Router::Bind(std::string endpoint){
-  if(endpoint.length())
-    CHECK_EQ(zsock_bind(router_, "%s", endpoint.c_str()),0);
-  return 1;
+
+Router::~Router() {
+  zsock_destroy(&router_);
+  for (auto it : id2addr_)
+    zframe_destroy(&it.second);
+  for (auto it : bufmsg_) {
+    for (auto *msg : it.second)
+      zmsg_destroy(&msg);
+  }
+}
+
+int Router::Bind(const std::string& endpoint) {
+  CHECK_GT(endpoint.length(), 0);
+  if (endpoint.length()) {
+    CHECK_EQ(zsock_bind(router_, "%s", endpoint.c_str()), 0);
+    return 1;
+  }
+  return 0;
 }
 
-int Router::Send(Msg **msg){
-  zmsg_t* zmsg=(*msg)->DumpToZmsg();
-  int dstid=(*msg)->dst();
-  if(id2addr_.find(dstid)!=id2addr_.end()){
+int Router::Send(Msg **msg) {
+  zmsg_t* zmsg = (*msg)->DumpToZmsg();
+  int dstid = (*msg)->dst();
+  if (id2addr_.find(dstid) != id2addr_.end()) {
     // the connection has already been set up
-    zframe_t* addr=zframe_dup(id2addr_[dstid]);
+    zframe_t* addr = zframe_dup(id2addr_[dstid]);
     zmsg_prepend(zmsg, &addr);
     zmsg_send(&zmsg, router_);
-  }else{
+  } else {
     // the connection is not ready, buffer the message
-    if(bufmsg_.size()==0)
-      nBufmsg_=0;
+    if (bufmsg_.size() == 0)
+      nBufmsg_ = 0;
     bufmsg_[dstid].push_back(zmsg);
-    nBufmsg_++;
+    ++nBufmsg_;
     CHECK_LE(nBufmsg_, bufsize_);
   }
   delete *msg;
-  *msg=NULL;
+  *msg = nullptr;
   return 1;
 }
 
-Msg* Router::Receive(){
-  zmsg_t* zmsg=zmsg_recv(router_);
-  if(zmsg==NULL)
+Msg* Router::Receive() {
+  zmsg_t* zmsg = zmsg_recv(router_);
+  if (zmsg == nullptr)
     return nullptr;
-  zframe_t* dealer=zmsg_pop(zmsg);
-  Msg* msg=new Msg();
+  zframe_t* dealer = zmsg_pop(zmsg);
+  Msg* msg = new Msg();
   msg->ParseFromZmsg(zmsg);
-  if (id2addr_.find(msg->src())==id2addr_.end()){
+  if (id2addr_.find(msg->src()) == id2addr_.end()) {
     // new connection, store the sender's identfier and send buffered messages
     // for it
-    id2addr_[msg->src()]=dealer;
-    if(bufmsg_.find(msg->src())!=bufmsg_.end()){
-      for(auto& it: bufmsg_.at(msg->src())){
-        zframe_t* addr=zframe_dup(dealer);
+    id2addr_[msg->src()] = dealer;
+    if (bufmsg_.find(msg->src()) != bufmsg_.end()) {
+      for (auto& it : bufmsg_.at(msg->src())) {
+        zframe_t* addr = zframe_dup(dealer);
         zmsg_prepend(it, &addr);
         zmsg_send(&it, router_);
       }
       bufmsg_.erase(msg->src());
     }
-  }
-  else
+  } else {
     zframe_destroy(&dealer);
+  }
   return msg;
 }
 
-Router::~Router(){
-  zsock_destroy(&router_);
-  for(auto it: id2addr_)
-    zframe_destroy(&it.second);
-  for(auto it: bufmsg_){
-    for(auto *msg: it.second)
-      zmsg_destroy(&msg);
-  }
+void* Router::InternalID() const {
+  return router_;
 }
-} /* singa */
+#endif
+
+}  // namespace singa


Mime
View raw message