incubator-mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject svn commit: r1236485 [7/7] - in /incubator/mesos/trunk: ./ include/mesos/ src/common/ src/exec/ src/local/ src/log/ src/master/ src/python/native/ src/sched/ src/slave/ src/tests/ src/zookeeper/ third_party/libprocess/ third_party/libprocess/include/pr...
Date Fri, 27 Jan 2012 01:25:15 GMT
Modified: incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/tests.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/tests.cpp Fri Jan 27 01:25:13 2012
@@ -1,28 +1,88 @@
+#include <arpa/inet.h>
+
 #include <gmock/gmock.h>
 
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
 #include <string>
+#include <sstream>
 
-#include <process/async.hpp>
+#include <process/clock.hpp>
+#include <process/defer.hpp>
 #include <process/dispatch.hpp>
-#include <process/latch.hpp>
+#include <process/executor.hpp>
+#include <process/filter.hpp>
+#include <process/future.hpp>
+#include <process/gc.hpp>
 #include <process/process.hpp>
 #include <process/run.hpp>
 #include <process/timer.hpp>
 
+#include "encoder.hpp"
+#include "thread.hpp"
+
 // Definition of a Set action to be used with gmock.
 ACTION_P2(Set, variable, value) { *variable = value; }
 
 using namespace process;
 
 using testing::_;
+using testing::Return;
 using testing::ReturnArg;
 
 
-class SpawnMockProcess : public Process<SpawnMockProcess>
+TEST(libprocess, thread)
+{
+  pthread_key_t key;
+  ASSERT_EQ(0, pthread_key_create(&key, NULL));
+
+  ThreadLocal<ProcessBase>* _process_ = new ThreadLocal<ProcessBase>(key);
+
+  ProcessBase* process = new ProcessBase();
+
+  ASSERT_TRUE(*(_process_) == NULL);
+
+  (*_process_) = process;
+
+  ASSERT_TRUE(*(_process_) == process);
+  ASSERT_FALSE(*(_process_) == NULL);
+
+  (*_process_) = NULL;
+
+  ASSERT_TRUE(*(_process_) == NULL);
+
+  delete process;
+  delete _process_;
+
+  ASSERT_EQ(0, pthread_key_delete(key));
+}
+
+
+TEST(libprocess, event)
+{
+  Event* event = new TerminateEvent(UPID());
+  EXPECT_FALSE(event->is<MessageEvent>());
+  EXPECT_FALSE(event->is<ExitedEvent>());
+  EXPECT_TRUE(event->is<TerminateEvent>());
+  delete event;
+}
+
+
+TEST(libprocess, future)
+{
+  Promise<bool> promise;
+  promise.set(true);
+  promise.future().await();
+  EXPECT_TRUE(promise.future().get());
+}
+
+
+class SpawnProcess : public Process<SpawnProcess>
 {
 public:
-  MOCK_METHOD0(__operator_call__, void());
-  virtual void operator () () { __operator_call__(); }
+  MOCK_METHOD0(initialize, void(void));
+  MOCK_METHOD0(finalize, void(void));
 };
 
 
@@ -30,27 +90,30 @@ TEST(libprocess, spawn)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  SpawnMockProcess process;
+  SpawnProcess process;
+
+  EXPECT_CALL(process, initialize())
+    .Times(1);
 
-  EXPECT_CALL(process, __operator_call__())
+  EXPECT_CALL(process, finalize())
     .Times(1);
 
-  PID<SpawnMockProcess> pid = spawn(&process);
+  PID<SpawnProcess> pid = spawn(process);
 
   ASSERT_FALSE(!pid);
 
+  terminate(pid);
   wait(pid);
 }
 
 
-class DispatchMockProcess : public Process<DispatchMockProcess>
+class DispatchProcess : public Process<DispatchProcess>
 {
 public:
   MOCK_METHOD0(func0, void());
   MOCK_METHOD1(func1, bool(bool));
-  MOCK_METHOD1(func2, Promise<bool>(bool));
+  MOCK_METHOD1(func2, Future<bool>(bool));
   MOCK_METHOD1(func3, int(int));
-  MOCK_METHOD1(func4, Promise<int>(int));
 };
 
 
@@ -58,7 +121,7 @@ TEST(libprocess, dispatch)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  DispatchMockProcess process;
+  DispatchProcess process;
 
   EXPECT_CALL(process, func0())
     .Times(1);
@@ -69,67 +132,73 @@ TEST(libprocess, dispatch)
   EXPECT_CALL(process, func2(_))
     .WillOnce(ReturnArg<0>());
 
-  PID<DispatchMockProcess> pid = spawn(&process);
+  PID<DispatchProcess> pid = spawn(&process);
 
   ASSERT_FALSE(!pid);
 
-  dispatch(pid, &DispatchMockProcess::func0);
+  dispatch(pid, &DispatchProcess::func0);
 
   Future<bool> future;
 
-  future = dispatch(pid, &DispatchMockProcess::func1, true);
+  future = dispatch(pid, &DispatchProcess::func1, true);
 
   EXPECT_TRUE(future.get());
   
-  future = dispatch(pid, &DispatchMockProcess::func2, true);
+  future = dispatch(pid, &DispatchProcess::func2, true);
 
   EXPECT_TRUE(future.get());
 
-  post(pid, TERMINATE);
+  terminate(pid);
   wait(pid);
 }
 
 
-TEST(libprocess, call)
+TEST(libprocess, defer)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  DispatchMockProcess process;
+  DispatchProcess process;
 
-  EXPECT_CALL(process, func3(_))
+  EXPECT_CALL(process, func1(_))
     .WillOnce(ReturnArg<0>());
 
-  EXPECT_CALL(process, func4(_))
+  EXPECT_CALL(process, func2(_))
     .WillOnce(ReturnArg<0>());
 
-  PID<DispatchMockProcess> pid = spawn(&process);
+  PID<DispatchProcess> pid = spawn(&process);
 
   ASSERT_FALSE(!pid);
 
-  int result;
+  deferred<Future<bool>(void)> func1 =
+    defer(pid, &DispatchProcess::func1, true);
+
+  Future<bool> future;
 
-  result = call(pid, &DispatchMockProcess::func3, 42);
+  future = func1();
 
-  EXPECT_EQ(42, result);
+  EXPECT_TRUE(future.get());
+
+  deferred<Future<bool>(void)> func2 =
+    defer(pid, &DispatchProcess::func2, true);
   
-  result = call(pid, &DispatchMockProcess::func4, 43);
+  future = func2();
 
-  EXPECT_EQ(43, result);
+  EXPECT_TRUE(future.get());
 
-  post(pid, TERMINATE);
+  terminate(pid);
   wait(pid);
 }
 
 
-class HandlersMockProcess : public Process<HandlersMockProcess>
+class HandlersProcess : public Process<HandlersProcess>
 {
 public:
-  HandlersMockProcess()
+  HandlersProcess()
   {
-    installMessageHandler("func", &HandlersMockProcess::func);
+    install("func", &HandlersProcess::func);
   }
 
-  MOCK_METHOD0(func, void());
+  MOCK_METHOD2(func, void(const UPID&, const std::string&));
 };
 
 
@@ -137,23 +206,23 @@ TEST(libprocess, handlers)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  HandlersMockProcess process;
+  HandlersProcess process;
 
-  EXPECT_CALL(process, func())
+  EXPECT_CALL(process, func(_, _))
     .Times(1);
 
-  PID<HandlersMockProcess> pid = spawn(&process);
+  PID<HandlersProcess> pid = spawn(&process);
 
   ASSERT_FALSE(!pid);
 
   post(pid, "func");
 
-  post(pid, TERMINATE);
+  terminate(pid, false);
   wait(pid);
 }
 
 
-class BaseMockProcess : public Process<BaseMockProcess>
+class BaseProcess : public Process<BaseProcess>
 {
 public:
   virtual void func() = 0;
@@ -161,10 +230,10 @@ public:
 };
 
 
-class DerivedMockProcess : public BaseMockProcess
+class DerivedProcess : public BaseProcess
 {
 public:
-  DerivedMockProcess() {}
+  DerivedProcess() {}
   MOCK_METHOD0(func, void());
 };
 
@@ -173,7 +242,7 @@ TEST(libprocess, inheritance)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  DerivedMockProcess process;
+  DerivedProcess process;
 
   EXPECT_CALL(process, func())
     .Times(2);
@@ -181,21 +250,21 @@ TEST(libprocess, inheritance)
   EXPECT_CALL(process, foo())
     .Times(1);
 
-  PID<DerivedMockProcess> pid1 = spawn(&process);
+  PID<DerivedProcess> pid1 = spawn(&process);
 
   ASSERT_FALSE(!pid1);
 
-  dispatch(pid1, &DerivedMockProcess::func);
+  dispatch(pid1, &DerivedProcess::func);
 
-  PID<BaseMockProcess> pid2(process);
-  PID<BaseMockProcess> pid3 = pid1;
+  PID<BaseProcess> pid2(process);
+  PID<BaseProcess> pid3 = pid1;
 
   ASSERT_EQ(pid2, pid3);
 
-  dispatch(pid3, &BaseMockProcess::func);
-  dispatch(pid3, &BaseMockProcess::foo);
+  dispatch(pid3, &BaseProcess::func);
+  dispatch(pid3, &BaseProcess::foo);
 
-  post(pid1, TERMINATE);
+  terminate(pid1, false);
   wait(pid1);
 }
 
@@ -238,10 +307,10 @@ class DelegateeProcess : public Process<
 public:
   DelegateeProcess()
   {
-    installMessageHandler("func", &DelegateeProcess::func);
+    install("func", &DelegateeProcess::func);
   }
 
-  MOCK_METHOD0(func, void());
+  MOCK_METHOD2(func, void(const UPID&, const std::string&));
 };
 
 
@@ -252,7 +321,7 @@ TEST(libprocess, delegate)
   DelegateeProcess delegatee;
   DelegatorProcess delegator(delegatee.self());
 
-  EXPECT_CALL(delegatee, func())
+  EXPECT_CALL(delegatee, func(_, _))
     .Times(1);
 
   spawn(&delegator);
@@ -260,89 +329,204 @@ TEST(libprocess, delegate)
 
   post(delegator.self(), "func");
 
-  post(delegator.self(), TERMINATE);
-  post(delegatee.self(), TERMINATE);
+  terminate(delegator, false);
+  wait(delegator);
 
-  wait(delegator.self());
-  wait(delegatee.self());
+  terminate(delegatee, false);
+  wait(delegatee);
 }
 
 
-class TerminateProcess : public Process<TerminateProcess>
-{
-public:
-  TerminateProcess(Latch* _latch) : latch(_latch) {}
+// class TerminateProcess : public Process<TerminateProcess>
+// {
+// public:
+//   TerminateProcess(Latch* _latch) : latch(_latch) {}
 
-protected:
-  virtual void operator () ()
-  {
-    latch->await();
-    receive();
-    EXPECT_EQ(TERMINATE, name());
-  }
+// protected:
+//   virtual void operator () ()
+//   {
+//     latch->await();
+//     receive();
+//     EXPECT_EQ(TERMINATE, name());
+//   }
 
-private:
-  Latch* latch;
-};
+// private:
+//   Latch* latch;
+// };
 
 
-TEST(libprocess, terminate)
-{
-  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+// TEST(libprocess, terminate)
+// {
+//   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
-  Latch latch;
+//   Latch latch;
 
-  TerminateProcess process(&latch);
+//   TerminateProcess process(&latch);
 
-  spawn(&process);
+//   spawn(&process);
 
-  post(process.self(), "one");
-  post(process.self(), "two");
-  post(process.self(), "three");
+//   post(process.self(), "one");
+//   post(process.self(), "two");
+//   post(process.self(), "three");
 
-  terminate(process.self());
+//   terminate(process.self());
 
-  latch.trigger();
+//   latch.trigger();
   
-  wait(process.self());
-}
+//   wait(process.self());
+// }
 
 
 class TimeoutProcess : public Process<TimeoutProcess>
 {
 public:
-  TimeoutProcess() {}
   MOCK_METHOD0(timeout, void());
 };
 
 
-TEST(libprocess, DISABLED_timer)
+TEST(libprocess, delay)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
   Clock::pause();
 
+  volatile bool timeoutCalled = false;
+
   TimeoutProcess process;
 
   EXPECT_CALL(process, timeout())
-    .Times(1);
+    .WillOnce(Set(&timeoutCalled, true));
+
+  spawn(process);
 
-  spawn(&process);
+  double seconds = 5.0;
 
-  double timeout = 5.0;
+  delay(seconds, process.self(), &TimeoutProcess::timeout);
 
-  Timer timer =
-    delay(timeout, process.self(), &TimeoutProcess::timeout);
+  Clock::advance(seconds);
 
-  Clock::advance(timeout);
+  while (!timeoutCalled);
 
-  post(process.self(), TERMINATE);
-  wait(process.self());
+  terminate(process);
+  wait(process);
 
   Clock::resume();
 }
 
 
+class OrderProcess : public Process<OrderProcess>
+{
+public:
+  void order(const PID<TimeoutProcess>& pid)
+  {
+    // TODO(benh): Add a test which uses 'send' instead of dispatch.
+    dispatch(pid, &TimeoutProcess::timeout);
+  }
+};
+
+
+TEST(libprocess, order)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  Clock::pause();
+
+  TimeoutProcess process1;
+
+  volatile bool timeoutCalled = false;
+
+  EXPECT_CALL(process1, timeout())
+    .WillOnce(Set(&timeoutCalled, true));
+
+  spawn(process1);
+
+  double now = Clock::now(&process1);
+
+  double seconds = 1.0;
+
+  Clock::advance(1.0);
+
+  EXPECT_EQ(now, Clock::now(&process1));
+
+  OrderProcess process2;
+  spawn(process2);
+
+  dispatch(process2, &OrderProcess::order, process1.self());
+
+  while (!timeoutCalled);
+
+  EXPECT_EQ(now + seconds, Clock::now(&process1));
+
+  terminate(process1);
+  wait(process1);
+
+  terminate(process2);
+  wait(process2);
+
+  Clock::resume();
+}
+
+
+class DonateProcess : public Process<DonateProcess>
+{
+public:
+  void donate()
+  {
+    DonateProcess process;
+    spawn(process);
+    terminate(process);
+    wait(process);
+  }
+};
+
+
+TEST(libprocess, donate)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  DonateProcess process;
+  spawn(process);
+
+  dispatch(process, &DonateProcess::donate);
+
+  terminate(process, false);
+  wait(process);
+}
+
+
+class ExitedProcess : public Process<ExitedProcess>
+{
+public:
+  ExitedProcess(const UPID& pid) { link(pid); }
+
+  MOCK_METHOD1(exited, void(const UPID&));
+};
+
+
+TEST(libprocess, exited)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  UPID pid = spawn(new ProcessBase(), true);
+
+  ExitedProcess process(pid);
+
+  volatile bool exitedCalled = false;
+
+  EXPECT_CALL(process, exited(pid))
+    .WillOnce(Set(&exitedCalled, true));
+  
+  spawn(process);
+
+  terminate(pid);
+
+  while (!exitedCalled);
+
+  terminate(process);
+  wait(process);
+}
+
+
 TEST(libprocess, select)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
@@ -360,11 +544,11 @@ TEST(libprocess, select)
 
   promise1.set(42);
 
-  Option<Future<int> > option = select(futures, 0);
+  Future<int> future = select(futures);
 
-  EXPECT_TRUE(option.isSome());
-  EXPECT_TRUE(option.get().ready());
-  EXPECT_EQ(42, option.get().get());
+  EXPECT_TRUE(future.await());
+  EXPECT_TRUE(future.isReady());
+  EXPECT_EQ(42, future.get());
 }
 
 
@@ -467,7 +651,7 @@ public:
 };
 
 
-TEST(libprocess, Async)
+TEST(libprocess, executor)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);
 
@@ -482,19 +666,19 @@ TEST(libprocess, Async)
   EXPECT_CALL(receiver, event2("event2"))
     .WillOnce(Set(&event2Called, true));
 
-  async::Dispatch dispatch;
+  Executor executor;
 
-  async::dispatch<void(int)> event1 =
-    dispatch(std::tr1::bind(&EventReceiver::event1,
-                            &receiver,
-                            std::tr1::placeholders::_1));
+  deferred<void(int)> event1 =
+    executor.defer(std::tr1::bind(&EventReceiver::event1,
+                                  &receiver,
+                                  std::tr1::placeholders::_1));
 
   event1(42);
 
-  async::dispatch<void(const std::string&)> event2 =
-    dispatch(std::tr1::bind(&EventReceiver::event2,
-                            &receiver,
-                            std::tr1::placeholders::_1));
+  deferred<void(const std::string&)> event2 =
+    executor.defer(std::tr1::bind(&EventReceiver::event2,
+                                  &receiver,
+                                  std::tr1::placeholders::_1));
 
   event2("event2");
 
@@ -503,6 +687,122 @@ TEST(libprocess, Async)
 }
 
 
+class RemoteProcess : public Process<RemoteProcess>
+{
+public:
+  RemoteProcess()
+  {
+    install("handler", &RemoteProcess::handler);
+  }
+
+  MOCK_METHOD2(handler, void(const UPID&, const std::string&));
+};
+
+
+TEST(libprocess, remote)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  RemoteProcess process;
+
+  volatile bool handlerCalled = false;
+
+  EXPECT_CALL(process, handler(_, _))
+    .WillOnce(Set(&handlerCalled, true));
+
+  spawn(process);
+
+  int s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
+
+  ASSERT_LE(0, s);
+
+  sockaddr_in addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin_family = PF_INET;
+  addr.sin_port = htons(process.self().port);
+  addr.sin_addr.s_addr = process.self().ip;
+
+  ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
+
+  Message message;
+  message.name = "handler";
+  message.from = UPID();
+  message.to = process.self();
+
+  const std::string& data = MessageEncoder::encode(&message);
+
+  ASSERT_EQ(data.size(), write(s, data.data(), data.size()));
+
+  ASSERT_EQ(0, close(s));
+
+  while (!handlerCalled);
+
+  terminate(process);
+  wait(process);
+}
+
+
+class HttpProcess : public Process<HttpProcess>
+{
+public:
+  HttpProcess()
+  {
+    route("handler", &HttpProcess::handler);
+  }
+
+  MOCK_METHOD1(handler, Future<HttpResponse>(const HttpRequest&));
+};
+
+
+TEST(libprocess, http)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  HttpProcess process;
+
+  EXPECT_CALL(process, handler(_))
+    .WillOnce(Return(HttpOKResponse()));
+
+  spawn(process);
+
+  int s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
+
+  ASSERT_LE(0, s);
+
+  sockaddr_in addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin_family = PF_INET;
+  addr.sin_port = htons(process.self().port);
+  addr.sin_addr.s_addr = process.self().ip;
+
+  ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
+
+  std::ostringstream out;
+
+  out << "GET /" << process.self().id << "/" << "handler"
+      << " HTTP/1.0\r\n"
+      << "Connection: Keep-Alive\r\n"
+      << "\r\n";
+
+  const std::string& data = out.str();
+
+  ASSERT_EQ(data.size(), write(s, data.data(), data.size()));
+
+  std::string response = "HTTP/1.1 200 OK";
+
+  char temp[response.size()];
+
+  ASSERT_LT(0, read(s, temp, response.size()));
+
+  ASSERT_EQ(response, std::string(temp, response.size()));
+
+  ASSERT_EQ(0, close(s));
+
+  terminate(process);
+  wait(process);
+}
+
+
 int main(int argc, char** argv)
 {
   // Initialize Google Mock/Test.

Added: incubator/mesos/trunk/third_party/libprocess/src/thread.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/thread.hpp?rev=1236485&view=auto
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/thread.hpp (added)
+++ incubator/mesos/trunk/third_party/libprocess/src/thread.hpp Fri Jan 27 01:25:13 2012
@@ -0,0 +1,42 @@
+#ifndef __PROCESS_THREAD_HPP__
+#define __PROCESS_THREAD_HPP__
+
+#include <pthread.h>
+
+#include <glog/logging.h>
+
+template <typename T>
+struct ThreadLocal
+{
+  explicit ThreadLocal(pthread_key_t _key) : key(_key) {}
+
+  ThreadLocal<T>& operator = (T* t)
+  {
+    if (pthread_setspecific(key, t) != 0) {
+      PLOG(FATAL) << "Failed to set thread local, pthread_setspecific";
+    }
+    return *this;
+  }
+
+  operator T* () const
+  {
+    return reinterpret_cast<T*>(pthread_getspecific(key));
+  }
+
+  T* operator -> () const
+  {
+    return reinterpret_cast<T*>(pthread_getspecific(key));
+  }
+
+private:
+  // Not expecting any other operators to be used (and the rest?).
+  bool operator * (const ThreadLocal<T>&) const;
+  bool operator == (const ThreadLocal<T>&) const;
+  bool operator != (const ThreadLocal<T>&) const;
+  bool operator < (const ThreadLocal<T>&) const;
+  bool operator > (const ThreadLocal<T>&) const;
+
+  const pthread_key_t key;
+};
+
+#endif // __PROCESS_THREAD_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/src/timer.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/timer.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/timer.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/timer.cpp Fri Jan 27 01:25:13 2012
@@ -1,5 +1,7 @@
 #include <process/timer.hpp>
 
+#include "timeout.hpp"
+
 namespace process {
 
 class TimerProcess : public Process<TimerProcess>
@@ -27,6 +29,9 @@ private:
 };
 
 
+static void dispatch()
+
+
 Timer::Timer(double secs,
              const UPID& pid,
              std::tr1::function<void(ProcessBase*)>* dispatcher)
@@ -45,7 +50,7 @@ Timer::~Timer()
 
 void Timer::cancel()
 {
-  terminate(timer);
+  timeouts::cancel(timeout);
 }
 
 } // namespace process {



Mime
View raw message