singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [4/4] incubator-singa git commit: SINGA-156 Remove the dependency on ZMQ for single process training
Date Mon, 04 Apr 2016 09:09:47 GMT
SINGA-156 Remove the dependency on ZMQ for single process training

bug fixing in communication part
check cpplint


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/914c1e72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/914c1e72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/914c1e72

Branch: refs/heads/master
Commit: 914c1e722d3c5e81c2f2bb4b1ffbd14a63f4aa3a
Parents: 42f5253
Author: WANG Sheng <wangsheng1001@gmail.com>
Authored: Mon Apr 4 13:53:06 2016 +0800
Committer: WANG Sheng <wangsheng1001@gmail.com>
Committed: Mon Apr 4 16:54:19 2016 +0800

----------------------------------------------------------------------
 include/singa/comm/socket.h        |  18 ++--
 include/singa/utils/safe_queue.h   | 141 +++++++++++++++++++-------------
 src/comm/msg.cc                    |   5 +-
 src/comm/socket.cc                 |   4 +-
 src/stub.cc                        |   3 +-
 src/test/test_connection_layers.cc |  12 ++-
 6 files changed, 107 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/include/singa/comm/socket.h
----------------------------------------------------------------------
diff --git a/include/singa/comm/socket.h b/include/singa/comm/socket.h
index 3194d8c..40d4cc3 100644
--- a/include/singa/comm/socket.h
+++ b/include/singa/comm/socket.h
@@ -43,17 +43,17 @@ class Dealer {
    /**
     * @param id used for identifying the msg queue of this dealer.
     */
-   Dealer(int id);
+   explicit Dealer(int id);
   ~Dealer();
   /**
-    * Setup the connection with the remote router.
-    *
-    * For local router, there is no need to connect it.
-    *
-    * @param endpoint Identifier of the remote router to connect. It follows
-    * ZeroMQ's format, i.e., IP:port, where IP is the connected process.
-    * @return 1 connection sets up successfully; 0 otherwise
-    */
+   * Setup the connection with the remote router.
+   *
+   * For local router, there is no need to connect it.
+   *
+   * @param endpoint Identifier of the remote router to connect. It follows
+   * ZeroMQ's format, i.e., IP:port, where IP is the connected process.
+   * @return 1 connection sets up successfully; 0 otherwise
+   */
   int Connect(const std::string& endpoint);
   /**
    * Send a message to the local router (id=-1) or remote outer. It is

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/include/singa/utils/safe_queue.h
----------------------------------------------------------------------
diff --git a/include/singa/utils/safe_queue.h b/include/singa/utils/safe_queue.h
index 99adbf0..31df1ef 100644
--- a/include/singa/utils/safe_queue.h
+++ b/include/singa/utils/safe_queue.h
@@ -1,7 +1,35 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+/**
+ * The code is adapted from following source:
+ * http://gnodebian.blogspot.sg/2013/07/a-thread-safe-asynchronous-queue-in-c11.html
+ * under Creative Commons Attribution 4.0 International Public License
+ */
+
 #ifndef SINGA_UTILS_SAFE_QUEUE_H_
 #define SINGA_UTILS_SAFE_QUEUE_H_
 
 // source: http://gnodebian.blogspot.sg/2013/07/a-thread-safe-asynchronous-queue-in-c11.html
+#include <algorithm>
 #include <queue>
 #include <list>
 #include <mutex>
@@ -12,33 +40,31 @@
 /** A thread-safe asynchronous queue */
 template <class T, class Container = std::list<T>>
 class SafeQueue {
-
   typedef typename Container::value_type value_type;
   typedef typename Container::size_type size_type;
   typedef Container container_type;
 
  public:
-
   /*! Create safe queue. */
   SafeQueue() = default;
-  SafeQueue (SafeQueue&& sq) {
-    m_queue = std::move (sq.m_queue);
+  SafeQueue(SafeQueue&& sq) {
+    m_queue = std::move(sq.m_queue);
   }
-  SafeQueue (const SafeQueue& sq) {
-    std::lock_guard<std::mutex> lock (sq.m_mutex);
+  SafeQueue(const SafeQueue& sq) {
+    std::lock_guard<std::mutex> lock(sq.m_mutex);
     m_queue = sq.m_queue;
   }
 
   /*! Destroy safe queue. */
   ~SafeQueue() {
-    std::lock_guard<std::mutex> lock (m_mutex);
+    std::lock_guard<std::mutex> lock(m_mutex);
   }
 
   /**
    * Sets the maximum number of items in the queue. Defaults is 0: No limit
    * \param[in] item An item.
    */
-  void set_max_num_items (unsigned int max_num_items) {
+  void set_max_num_items(unsigned int max_num_items) {
     m_max_num_items = max_num_items;
   }
 
@@ -47,13 +73,13 @@ class SafeQueue {
    * \param[in] item An item.
    * \return true if an item was pushed into the queue
    */
-  bool push (const value_type& item) {
-    std::lock_guard<std::mutex> lock (m_mutex);
+  bool push(const value_type& item) {
+    std::lock_guard<std::mutex> lock(m_mutex);
 
     if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
       return false;
 
-    m_queue.push (item);
+    m_queue.push(item);
     m_condition.notify_one();
     return true;
   }
@@ -63,13 +89,13 @@ class SafeQueue {
    * \param[in] item An item.
    * \return true if an item was pushed into the queue
    */
-  bool push (const value_type&& item) {
-    std::lock_guard<std::mutex> lock (m_mutex);
+  bool push(const value_type&& item) {
+    std::lock_guard<std::mutex> lock(m_mutex);
 
     if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
       return false;
 
-    m_queue.push (item);
+    m_queue.push(item);
     m_condition.notify_one();
     return true;
   }
@@ -78,12 +104,11 @@ class SafeQueue {
    *  Pops item from the queue. If queue is empty, this function blocks until item becomes
available.
    * \param[out] item The item.
    */
-  void pop (value_type& item) {
-    std::unique_lock<std::mutex> lock (m_mutex);
-    m_condition.wait (lock, [this]() // Lambda funct
-        {
+  void pop(value_type& item) {
+    std::unique_lock<std::mutex> lock(m_mutex);
+    m_condition.wait(lock, [this]() {  // Lambda funct
         return !m_queue.empty();
-        });
+      });
     item = m_queue.front();
     m_queue.pop();
   }
@@ -94,13 +119,12 @@ class SafeQueue {
    *  If queue is empty, this function blocks until item becomes available.
    * \param[out] item The item.
    */
-  void move_pop (value_type& item) {
-    std::unique_lock<std::mutex> lock (m_mutex);
-    m_condition.wait (lock, [this]() // Lambda funct
-        {
+  void move_pop(value_type& item) {
+    std::unique_lock<std::mutex> lock(m_mutex);
+    m_condition.wait(lock, [this]() {  // Lambda funct
         return !m_queue.empty();
-        });
-    item = std::move (m_queue.front());
+      });
+    item = std::move(m_queue.front());
     m_queue.pop();
   }
 
@@ -109,8 +133,8 @@ class SafeQueue {
    * \param[out] item The item.
    * \return False is returned if no item is available.
    */
-  bool try_pop (value_type& item) {
-    std::unique_lock<std::mutex> lock (m_mutex);
+  bool try_pop(value_type& item) {
+    std::unique_lock<std::mutex> lock(m_mutex);
 
     if (m_queue.empty())
       return false;
@@ -126,13 +150,13 @@ class SafeQueue {
    * \param[out] item The item.
    * \return False is returned if no item is available.
    */
-  bool try_move_pop (value_type& item) {
-    std::unique_lock<std::mutex> lock (m_mutex);
+  bool try_move_pop(value_type& item) {
+    std::unique_lock<std::mutex> lock(m_mutex);
 
     if (m_queue.empty())
       return false;
 
-    item = std::move (m_queue.front());
+    item = std::move(m_queue.front());
     m_queue.pop();
     return true;
   }
@@ -143,15 +167,15 @@ class SafeQueue {
    * \param[in] timeout The number of microseconds to wait.
    * \return true if get an item from the queue, false if no item is received before the
timeout.
    */
-  bool timeout_pop (value_type& item, std::uint64_t timeout) {
-    std::unique_lock<std::mutex> lock (m_mutex);
+  bool timeout_pop(value_type& item, std::uint64_t timeout) {
+    std::unique_lock<std::mutex> lock(m_mutex);
 
-    if (m_queue.empty())
-    {
+    if (m_queue.empty()) {
       if (timeout == 0)
         return false;
 
-      if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
+      if (m_condition.wait_for(lock, std::chrono::microseconds(timeout))
+          == std::cv_status::timeout)
         return false;
     }
 
@@ -168,19 +192,19 @@ class SafeQueue {
    * \param[in] timeout The number of microseconds to wait.
    * \return true if get an item from the queue, false if no item is received before the
timeout.
    */
-  bool timeout_move_pop (value_type& item, std::uint64_t timeout) {
-    std::unique_lock<std::mutex> lock (m_mutex);
+  bool timeout_move_pop(value_type& item, std::uint64_t timeout) {
+    std::unique_lock<std::mutex> lock(m_mutex);
 
-    if (m_queue.empty())
-    {
+    if (m_queue.empty()) {
       if (timeout == 0)
         return false;
 
-      if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
+      if (m_condition.wait_for(lock, std::chrono::microseconds(timeout))
+          == std::cv_status::timeout)
         return false;
     }
 
-    item = std::move (m_queue.front());
+    item = std::move(m_queue.front());
     m_queue.pop();
     return true;
   }
@@ -190,7 +214,7 @@ class SafeQueue {
    * \return Number of items in the queue.
    */
   size_type size() const {
-    std::lock_guard<std::mutex> lock (m_mutex);
+    std::lock_guard<std::mutex> lock(m_mutex);
     return m_queue.size();
   }
 
@@ -199,7 +223,7 @@ class SafeQueue {
    * \return true if queue is empty.
    */
   bool empty() const {
-    std::lock_guard<std::mutex> lock (m_mutex);
+    std::lock_guard<std::mutex> lock(m_mutex);
     return m_queue.empty();
   }
 
@@ -207,11 +231,11 @@ class SafeQueue {
    *  Swaps the contents.
    * \param[out] sq The SafeQueue to swap with 'this'.
    */
-  void swap (SafeQueue& sq) {
+  void swap(SafeQueue& sq) {
     if (this != &sq) {
-      std::lock_guard<std::mutex> lock1 (m_mutex);
-      std::lock_guard<std::mutex> lock2 (sq.m_mutex);
-      m_queue.swap (sq.m_queue);
+      std::lock_guard<std::mutex> lock1(m_mutex);
+      std::lock_guard<std::mutex> lock2(sq.m_mutex);
+      m_queue.swap(sq.m_queue);
 
       if (!m_queue.empty())
         m_condition.notify_all();
@@ -224,10 +248,10 @@ class SafeQueue {
   /*! The copy assignment operator */
   SafeQueue& operator= (const SafeQueue& sq) {
     if (this != &sq) {
-      std::lock_guard<std::mutex> lock1 (m_mutex);
-      std::lock_guard<std::mutex> lock2 (sq.m_mutex);
-      std::queue<T, Container> temp {sq.m_queue};
-      m_queue.swap (temp);
+      std::lock_guard<std::mutex> lock1(m_mutex);
+      std::lock_guard<std::mutex> lock2(sq.m_mutex);
+      std::queue<T, Container> temp{sq.m_queue};
+      m_queue.swap(temp);
 
       if (!m_queue.empty())
         m_condition.notify_all();
@@ -238,17 +262,15 @@ class SafeQueue {
 
   /*! The move assignment operator */
   SafeQueue& operator= (SafeQueue && sq) {
-    std::lock_guard<std::mutex> lock (m_mutex);
-    m_queue = std::move (sq.m_queue);
+    std::lock_guard<std::mutex> lock(m_mutex);
+    m_queue = std::move(sq.m_queue);
 
-    if (!m_queue.empty())  m_condition.notify_all();
+    if (!m_queue.empty()) m_condition.notify_all();
 
     return *this;
   }
 
-
  private:
-
   std::queue<T, Container> m_queue;
   mutable std::mutex m_mutex;
   std::condition_variable m_condition;
@@ -257,7 +279,8 @@ class SafeQueue {
 
 /*! Swaps the contents of two SafeQueue objects. */
 template <class T, class Container>
-void swap (SafeQueue<T, Container>& q1, SafeQueue<T, Container>& q2)
{
-  q1.swap (q2);
+void swap(SafeQueue<T, Container>& q1, SafeQueue<T, Container>& q2) {
+  q1.swap(q2);
 }
-#endif // SINGA_UTILS_SAFE_QUEUE_H_
+
+#endif  // SINGA_UTILS_SAFE_QUEUE_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/comm/msg.cc
----------------------------------------------------------------------
diff --git a/src/comm/msg.cc b/src/comm/msg.cc
index 94f3074..8128b46 100644
--- a/src/comm/msg.cc
+++ b/src/comm/msg.cc
@@ -128,7 +128,10 @@ int Msg::FrameSize() {
 }
 
 char* Msg::FrameStr() {
-  return static_cast<char*>(frames_.at(idx_).first);
+  char* ret = new char[frames_.at(idx_).second];
+  memcpy(ret, static_cast<char*>(frames_.at(idx_).first), 
+        frames_.at(idx_).second);
+  return ret;
 }
 
 void* Msg::FrameData() {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/comm/socket.cc
----------------------------------------------------------------------
diff --git a/src/comm/socket.cc b/src/comm/socket.cc
index aa1ee85..9afc54c 100644
--- a/src/comm/socket.cc
+++ b/src/comm/socket.cc
@@ -23,7 +23,7 @@
 #include <glog/logging.h>
 
 namespace singa {
-const int TIME_OUT = 2; // max blocking time in milliseconds.
+const int TIME_OUT = 2;  // max blocking time in milliseconds.
 std::unordered_map<int, SafeQueue<Msg*>> msgQueues;
 Dealer::~Dealer() {
 #ifdef USE_ZMQ
@@ -68,7 +68,7 @@ int Dealer::Send(Msg** msg) {
 Msg* Dealer::Receive(int timeout) {
   Msg* msg = nullptr;
   if (timeout > 0) {
-    if(!msgQueues.at(id_).timeout_pop(msg, timeout))
+    if (!msgQueues.at(id_).timeout_pop(msg, timeout))
       return nullptr;
   } else {
     msgQueues.at(id_).pop(msg);

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/stub.cc
----------------------------------------------------------------------
diff --git a/src/stub.cc b/src/stub.cc
index c7658fc..4bc8c3d 100644
--- a/src/stub.cc
+++ b/src/stub.cc
@@ -48,8 +48,9 @@ void Stub::Setup() {
     const string hostip = cluster->hostip();
     int port = router_->Bind("tcp://" + hostip + ":*");
     endpoint_ = hostip + ":" + std::to_string(port);
-  } else
+  } else {
     endpoint_ = "localhost";
+  }
 }
 /**
  * Get a hash id for a Param object from a group.

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/914c1e72/src/test/test_connection_layers.cc
----------------------------------------------------------------------
diff --git a/src/test/test_connection_layers.cc b/src/test/test_connection_layers.cc
index 6529840..cd7f5f5 100644
--- a/src/test/test_connection_layers.cc
+++ b/src/test/test_connection_layers.cc
@@ -118,11 +118,15 @@ TEST(ConnectionLayerTest, BridgeTest) {
   ASSERT_EQ(dst.data(nullptr).shape(0), N);
   ASSERT_EQ(dst.data(nullptr).shape(1), M);
 
+  msgQueues[-1];
+  msgQueues[Addr(0, 0, kWorkerLayer)];
+
   // bind bridges to socket
-  Router router(N);
-  router.Bind("inproc://router");
-  Dealer dealer(0);
-  dealer.Connect("inproc://router");
+  // Router router(N);
+  Router router;
+  // router.Bind("inproc://router");
+  Dealer dealer(Addr(0, 0, kWorkerLayer));
+  // dealer.Connect("inproc://router");
   std::unordered_map<std::string, Layer*> name2bridge;
   name2bridge[src.name()] = &src;
   name2bridge[dst.name()] = &dst;


Mime
View raw message