arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-1265: [Plasma] Clean up all resources on SIGTERM to keep valgrind output clean
Date Tue, 01 Aug 2017 05:05:20 GMT
Repository: arrow
Updated Branches:
  refs/heads/master b5ff2f6b4 -> 3a84653a3


ARROW-1265: [Plasma] Clean up all resources on SIGTERM to keep valgrind output clean

@pcmoritz this might be a little bit OCD since all of the valgrind warnings were for memory
that was still reachable, but let me know what you think.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #921 from wesm/ARROW-1265 and squashes the following commits:

ac2e8309 [Wes McKinney] Consistent function naming style for EventLoop methods
22f440f4 [Wes McKinney] clang-format
97dbd16f [Wes McKinney] Clean up all resources on SIGTERM to keep valgrind output clean


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/3a84653a
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/3a84653a
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/3a84653a

Branch: refs/heads/master
Commit: 3a84653a3aa00f36f6312a11e58d1daf41dedcee
Parents: b5ff2f6
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Tue Aug 1 01:05:16 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Tue Aug 1 01:05:16 2017 -0400

----------------------------------------------------------------------
 cpp/src/plasma/events.cc | 26 +++++++------
 cpp/src/plasma/events.h  | 19 ++++++----
 cpp/src/plasma/store.cc  | 87 +++++++++++++++++++++++++++++--------------
 cpp/src/plasma/store.h   |  6 ++-
 4 files changed, 90 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/3a84653a/cpp/src/plasma/events.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc
index f98ced2..4e4ecfa 100644
--- a/cpp/src/plasma/events.cc
+++ b/cpp/src/plasma/events.cc
@@ -21,13 +21,12 @@
 
 namespace plasma {
 
-void EventLoop::file_event_callback(aeEventLoop* loop, int fd, void* context,
-                                    int events) {
+void EventLoop::FileEventCallback(aeEventLoop* loop, int fd, void* context, int events) {
   FileCallback* callback = reinterpret_cast<FileCallback*>(context);
   (*callback)(events);
 }
 
-int EventLoop::timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* context) {
+int EventLoop::TimerEventCallback(aeEventLoop* loop, TimerID timer_id, void* context) {
   TimerCallback* callback = reinterpret_cast<TimerCallback*>(context);
   return (*callback)(timer_id);
 }
@@ -36,21 +35,21 @@ constexpr int kInitialEventLoopSize = 1024;
 
 EventLoop::EventLoop() { loop_ = aeCreateEventLoop(kInitialEventLoopSize); }
 
-bool EventLoop::add_file_event(int fd, int events, const FileCallback& callback) {
+bool EventLoop::AddFileEvent(int fd, int events, const FileCallback& callback) {
   if (file_callbacks_.find(fd) != file_callbacks_.end()) {
     return false;
   }
   auto data = std::unique_ptr<FileCallback>(new FileCallback(callback));
   void* context = reinterpret_cast<void*>(data.get());
   // Try to add the file descriptor.
-  int err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, context);
+  int err = aeCreateFileEvent(loop_, fd, events, EventLoop::FileEventCallback, context);
   // If it cannot be added, increase the size of the event loop.
   if (err == AE_ERR && errno == ERANGE) {
     err = aeResizeSetSize(loop_, 3 * aeGetSetSize(loop_) / 2);
     if (err != AE_OK) {
       return false;
     }
-    err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, context);
+    err = aeCreateFileEvent(loop_, fd, events, EventLoop::FileEventCallback, context);
   }
   // In any case, test if there were errors.
   if (err == AE_OK) {
@@ -60,23 +59,28 @@ bool EventLoop::add_file_event(int fd, int events, const FileCallback&
callback)
   return false;
 }
 
-void EventLoop::remove_file_event(int fd) {
+void EventLoop::RemoveFileEvent(int fd) {
   aeDeleteFileEvent(loop_, fd, AE_READABLE | AE_WRITABLE);
   file_callbacks_.erase(fd);
 }
 
-void EventLoop::run() { aeMain(loop_); }
+void EventLoop::Start() { aeMain(loop_); }
 
-int64_t EventLoop::add_timer(int64_t timeout, const TimerCallback& callback) {
+void EventLoop::Stop() {
+  aeStop(loop_);
+  aeDeleteEventLoop(loop_);
+}
+
+int64_t EventLoop::AddTimer(int64_t timeout, const TimerCallback& callback) {
   auto data = std::unique_ptr<TimerCallback>(new TimerCallback(callback));
   void* context = reinterpret_cast<void*>(data.get());
   int64_t timer_id =
-      aeCreateTimeEvent(loop_, timeout, EventLoop::timer_event_callback, context, NULL);
+      aeCreateTimeEvent(loop_, timeout, EventLoop::TimerEventCallback, context, NULL);
   timer_callbacks_.emplace(timer_id, std::move(data));
   return timer_id;
 }
 
-int EventLoop::remove_timer(int64_t timer_id) {
+int EventLoop::RemoveTimer(int64_t timer_id) {
   int err = aeDeleteTimeEvent(loop_, timer_id);
   timer_callbacks_.erase(timer_id);
   return err;

http://git-wip-us.apache.org/repos/asf/arrow/blob/3a84653a/cpp/src/plasma/events.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.h b/cpp/src/plasma/events.h
index 6cb5b73..4291484 100644
--- a/cpp/src/plasma/events.h
+++ b/cpp/src/plasma/events.h
@@ -61,13 +61,13 @@ class EventLoop {
   /// @param events The flags for events we are listening to (read or write).
   /// @param callback The callback that will be called when the event happens.
   /// @return Returns true if the event handler was added successfully.
-  bool add_file_event(int fd, int events, const FileCallback& callback);
+  bool AddFileEvent(int fd, int events, const FileCallback& callback);
 
   /// Remove a file event handler from the event loop.
   ///
   /// @param fd The file descriptor of the event handler.
   /// @return Void.
-  void remove_file_event(int fd);
+  void RemoveFileEvent(int fd);
 
   /// Register a handler that will be called after a time slice of
   ///  "timeout" milliseconds.
@@ -75,23 +75,26 @@ class EventLoop {
   ///  @param timeout The timeout in milliseconds.
   ///  @param callback The callback for the timeout.
   ///  @return The ID of the newly created timer.
-  int64_t add_timer(int64_t timeout, const TimerCallback& callback);
+  int64_t AddTimer(int64_t timeout, const TimerCallback& callback);
 
   /// Remove a timer handler from the event loop.
   ///
   /// @param timer_id The ID of the timer that is to be removed.
   /// @return The ae.c error code. TODO(pcm): needs to be standardized
-  int remove_timer(int64_t timer_id);
+  int RemoveTimer(int64_t timer_id);
 
-  /// Run the event loop.
+  /// \brief Run the event loop.
   ///
   /// @return Void.
-  void run();
+  void Start();
+
+  /// \brief Stop the event loop
+  void Stop();
 
  private:
-  static void file_event_callback(aeEventLoop* loop, int fd, void* context, int events);
+  static void FileEventCallback(aeEventLoop* loop, int fd, void* context, int events);
 
-  static int timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* context);
+  static int TimerEventCallback(aeEventLoop* loop, TimerID timer_id, void* context);
 
   aeEventLoop* loop_;
   std::unordered_map<int, std::unique_ptr<FileCallback>> file_callbacks_;

http://git-wip-us.apache.org/repos/asf/arrow/blob/3a84653a/cpp/src/plasma/store.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index a9425b6..9f4b98c 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -260,7 +260,7 @@ void PlasmaStore::return_from_get(GetRequest* get_req) {
   }
   // Remove the get request.
   if (get_req->timer != -1) {
-    ARROW_CHECK(loop_->remove_timer(get_req->timer) == AE_OK);
+    ARROW_CHECK(loop_->RemoveTimer(get_req->timer) == AE_OK);
   }
   delete get_req;
 }
@@ -330,7 +330,7 @@ void PlasmaStore::process_get_request(Client* client,
   } else if (timeout_ms != -1) {
     // Set a timer that will cause the get request to return to the client. Note
     // that a timeout of -1 is used to indicate that no timer should be set.
-    get_req->timer = loop_->add_timer(timeout_ms, [this, get_req](int64_t timer_id)
{
+    get_req->timer = loop_->AddTimer(timeout_ms, [this, get_req](int64_t timer_id)
{
       return_from_get(get_req);
       return kEventLoopTimerDone;
     });
@@ -412,11 +412,13 @@ void PlasmaStore::delete_objects(const std::vector<ObjectID>&
object_ids) {
 
 void PlasmaStore::connect_client(int listener_sock) {
   int client_fd = AcceptClient(listener_sock);
-  // This is freed in disconnect_client.
+
   Client* client = new Client(client_fd);
+  connected_clients_[client_fd] = std::unique_ptr<Client>(client);
+
   // Add a callback to handle events on this socket.
   // TODO(pcm): Check return value.
-  loop_->add_file_event(client_fd, kEventLoopRead, [this, client](int events) {
+  loop_->AddFileEvent(client_fd, kEventLoopRead, [this, client](int events) {
     Status s = process_message(client);
     if (!s.ok()) {
       ARROW_LOG(FATAL) << "Failed to process file event: " << s;
@@ -425,23 +427,25 @@ void PlasmaStore::connect_client(int listener_sock) {
   ARROW_LOG(DEBUG) << "New connection with fd " << client_fd;
 }
 
-void PlasmaStore::disconnect_client(Client* client) {
-  ARROW_CHECK(client != NULL);
-  ARROW_CHECK(client->fd > 0);
-  loop_->remove_file_event(client->fd);
+void PlasmaStore::disconnect_client(int client_fd) {
+  ARROW_CHECK(client_fd > 0);
+  auto it = connected_clients_.find(client_fd);
+  ARROW_CHECK(it != connected_clients_.end());
+  loop_->RemoveFileEvent(client_fd);
   // Close the socket.
-  close(client->fd);
-  ARROW_LOG(INFO) << "Disconnecting client on fd " << client->fd;
+  close(client_fd);
+  ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
   // If this client was using any objects, remove it from the appropriate
   // lists.
   for (const auto& entry : store_info_.objects) {
-    remove_client_from_object_clients(entry.second.get(), client);
+    remove_client_from_object_clients(entry.second.get(), it->second.get());
   }
+
   // Note, the store may still attempt to send a message to the disconnected
   // client (for example, when an object ID that the client was waiting for
   // is ready). In these cases, the attempt to send the message will fail, but
   // the store should just ignore the failure.
-  delete client;
+  connected_clients_.erase(it);
 }
 
 /// Send notifications about sealed objects to the subscribers. This is called
@@ -478,7 +482,7 @@ void PlasmaStore::send_notifications(int client_fd) {
       // at the end of the method.
       // TODO(pcm): Introduce status codes and check in case the file descriptor
       // is added twice.
-      loop_->add_file_event(client_fd, kEventLoopWrite, [this, client_fd](int events)
{
+      loop_->AddFileEvent(client_fd, kEventLoopWrite, [this, client_fd](int events) {
         send_notifications(client_fd);
       });
       break;
@@ -507,7 +511,7 @@ void PlasmaStore::send_notifications(int client_fd) {
 
   // If we have sent all notifications, remove the fd from the event loop.
   if (it->second.object_notifications.empty()) {
-    loop_->remove_file_event(client_fd);
+    loop_->RemoveFileEvent(client_fd);
   }
 }
 
@@ -616,7 +620,7 @@ Status PlasmaStore::process_message(Client* client) {
     } break;
     case DISCONNECT_CLIENT:
       ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
-      disconnect_client(client);
+      disconnect_client(client->fd);
       break;
     default:
       // This code should be unreachable.
@@ -625,9 +629,43 @@ Status PlasmaStore::process_message(Client* client) {
   return Status::OK();
 }
 
-// Report "success" to valgrind.
-void signal_handler(int signal) {
+class PlasmaStoreRunner {
+ public:
+  PlasmaStoreRunner() {}
+
+  void Start(char* socket_name, int64_t system_memory) {
+    // Create the event loop.
+    loop_.reset(new EventLoop);
+    store_.reset(new PlasmaStore(loop_.get(), system_memory));
+    int socket = bind_ipc_sock(socket_name, true);
+    // TODO(pcm): Check return value.
+    ARROW_CHECK(socket >= 0);
+
+    loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) {
+      this->store_->connect_client(socket);
+    });
+    loop_->Start();
+  }
+
+  void Shutdown() {
+    loop_->Stop();
+    loop_ = nullptr;
+    store_ = nullptr;
+  }
+
+ private:
+  std::unique_ptr<EventLoop> loop_;
+  std::unique_ptr<PlasmaStore> store_;
+};
+
+static PlasmaStoreRunner* g_runner = nullptr;
+
+void HandleSignal(int signal) {
   if (signal == SIGTERM) {
+    if (g_runner != nullptr) {
+      g_runner->Shutdown();
+    }
+    // Report "success" to valgrind.
     exit(0);
   }
 }
@@ -636,21 +674,16 @@ void start_server(char* socket_name, int64_t system_memory) {
   // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
   // to a client that has already died, the store could die.
   signal(SIGPIPE, SIG_IGN);
-  // Create the event loop.
-  EventLoop loop;
-  PlasmaStore store(&loop, system_memory);
-  int socket = bind_ipc_sock(socket_name, true);
-  ARROW_CHECK(socket >= 0);
-  // TODO(pcm): Check return value.
-  loop.add_file_event(socket, kEventLoopRead,
-                      [&store, socket](int events) { store.connect_client(socket); });
-  loop.run();
+
+  PlasmaStoreRunner runner;
+  g_runner = &runner;
+  signal(SIGTERM, HandleSignal);
+  runner.Start(socket_name, system_memory);
 }
 
 }  // namespace plasma
 
 int main(int argc, char* argv[]) {
-  signal(SIGTERM, plasma::signal_handler);
   char* socket_name = NULL;
   int64_t system_memory = -1;
   int c;

http://git-wip-us.apache.org/repos/asf/arrow/blob/3a84653a/cpp/src/plasma/store.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index fec25c1..fb732a1 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -127,9 +127,9 @@ class PlasmaStore {
 
   /// Disconnect a client from the PlasmaStore.
   ///
-  /// @param client The client that is disconnected.
+  /// @param client_fd The client file descriptor that is disconnected.
   /// @return Void.
-  void disconnect_client(Client* client);
+  void disconnect_client(int client_fd);
 
   void send_notifications(int client_fd);
 
@@ -166,6 +166,8 @@ class PlasmaStore {
   /// TODO(pcm): Consider putting this into the Client data structure and
   /// reorganize the code slightly.
   std::unordered_map<int, NotificationQueue> pending_notifications_;
+
+  std::unordered_map<int, std::unique_ptr<Client>> connected_clients_;
 };
 
 }  // namespace plasma


Mime
View raw message