mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject git commit: Task reconciliation for frameworks.
Date Sat, 12 Oct 2013 17:38:40 GMT
Updated Branches:
  refs/heads/master ceb3931a3 -> 4094fc69f


Task reconciliation for frameworks.

A framework can reconcile tasks in connection with a master fail-over
by sending it's "last known state" of its tasks and confirm or
reestablish the current state of the setup.

If existing tasks have changed state, a status update with the new
state will be sent.

Soon, we may be able to reliably tell if any slave or task is no
longer present. A status update with TASK_LOST should be sent to the
framework.

From: Niklas Nielsen <nik@qni.dk>
Review: https://reviews.apache.org/r/14540


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4094fc69
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4094fc69
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4094fc69

Branch: refs/heads/master
Commit: 4094fc69fea5e0cb1cf6072e5e560df84f08a889
Parents: ceb3931
Author: Benjamin Hindman <benjamin.hindman@gmail.com>
Authored: Sat Oct 12 10:35:41 2013 -0700
Committer: Benjamin Hindman <benjamin.hindman@gmail.com>
Committed: Sat Oct 12 10:37:56 2013 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto                       |  1 +
 include/mesos/scheduler.hpp                     |  9 +++
 src/common/protobuf_utils.hpp                   |  1 +
 src/exec/exec.cpp                               |  4 ++
 .../org_apache_mesos_MesosSchedulerDriver.cpp   | 44 ++++++++++++
 .../org/apache/mesos/MesosSchedulerDriver.java  |  2 +
 .../src/org/apache/mesos/SchedulerDriver.java   |  6 ++
 src/master/master.cpp                           | 54 ++++++++++++++
 src/master/master.hpp                           |  4 ++
 src/messages/messages.proto                     |  6 ++
 .../native/mesos_scheduler_driver_impl.cpp      | 48 +++++++++++++
 .../native/mesos_scheduler_driver_impl.hpp      |  4 ++
 src/sched/sched.cpp                             | 34 +++++++++
 src/tests/master_tests.cpp                      | 76 ++++++++++++++++++++
 14 files changed, 293 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 2d7284b..fe1d82b 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -389,6 +389,7 @@ message TaskStatus {
   required TaskState state = 2;
   optional string message = 4; // Possible message explaining state.
   optional bytes data = 3;
+  optional SlaveID slave_id = 5;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 442a4a7..4c04216 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -280,6 +280,13 @@ public:
   virtual Status sendFrameworkMessage(const ExecutorID& executorId,
                                       const SlaveID& slaveId,
                                       const std::string& data) = 0;
+
+  /**
+   * Reconciliation of tasks causes the master to send status updates for tasks
+   * whose status differs from the status sent here.
+   */
+  virtual Status reconcileTasks(
+      const std::vector<TaskStatus>& statuses) = 0;
 };
 
 
@@ -368,6 +375,8 @@ public:
   virtual Status sendFrameworkMessage(const ExecutorID& executorId,
                                       const SlaveID& slaveId,
                                       const std::string& data);
+  virtual Status reconcileTasks(
+      const std::vector<TaskStatus>& statuses);
 
 private:
   Scheduler* scheduler;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 5c5f052..19a49ab 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -63,6 +63,7 @@ inline StatusUpdate createStatusUpdate(
 
   TaskStatus* status = update.mutable_status();
   status->mutable_task_id()->MergeFrom(taskId);
+  status->mutable_slave_id()->MergeFrom(slaveId);
   status->set_state(state);
   status->set_message(message);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index d370560..7ca21fa 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -498,6 +498,10 @@ protected:
     update->set_uuid(UUID::random().toBytes());
     message.set_pid(self());
 
+    // Incoming status update might come from an executor which has not set
+    // slave id in TaskStatus. Set/overwrite slave id.
+    update->mutable_status()->mutable_slave_id()->CopyFrom(slaveId);;
+
     VLOG(1) << "Executor sending status update " << *update;
 
     // Capture the status update.

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
index 9df2143..9869929 100644
--- a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
+++ b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
@@ -843,4 +843,48 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_requestReso
   return convert<Status>(env, status);
 }
 
+/*
+ * Class:     org_apache_mesos_MesosSchedulerDriver
+ * Method:    reconcileTasks
+ * Signature: (Ljava/util/Collection;)Lorg/apache/mesos/Protos/Status;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_reconcileTasks
+  (JNIEnv* env, jobject thiz, jobject jstatuses)
+{
+  // Construct a C++ TaskStatus from each Java TaskStatus.
+  vector<TaskStatus> statuses;
+
+  jclass clazz = env->GetObjectClass(jstatuses);
+
+  // Iterator iterator = statuses.iterator();
+  jmethodID iterator =
+    env->GetMethodID(clazz, "iterator", "()Ljava/util/Iterator;");
+  jobject jiterator = env->CallObjectMethod(jstatuses, iterator);
+
+  clazz = env->GetObjectClass(jiterator);
+
+  // while (iterator.hasNext()) {
+  jmethodID hasNext = env->GetMethodID(clazz, "hasNext", "()Z");
+
+  jmethodID next = env->GetMethodID(clazz, "next", "()Ljava/lang/Object;");
+
+  while (env->CallBooleanMethod(jiterator, hasNext)) {
+    // Object status = iterator.next();
+    jobject jstatus = env->CallObjectMethod(jiterator, next);
+    const TaskStatus& status = construct<TaskStatus>(env, jstatus);
+    statuses.push_back(status);
+  }
+
+  // Now invoke the underlying driver.
+  clazz = env->GetObjectClass(thiz);
+
+  jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
+  MesosSchedulerDriver* driver =
+    (MesosSchedulerDriver*) env->GetLongField(thiz, __driver);
+
+  Status status = driver->reconcileTasks(statuses);
+
+  return convert<Status>(env, status);
+}
+
 } // extern "C" {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
index 662d987..ed4b4a3 100644
--- a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
@@ -169,6 +169,8 @@ public class MesosSchedulerDriver implements SchedulerDriver {
                                             SlaveID slaveId,
                                             byte[] data);
 
+  public native Status reconcileTasks(Collection<TaskStatus> statuses);
+
   protected native void initialize();
   protected native void finalize();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/java/src/org/apache/mesos/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/SchedulerDriver.java b/src/java/src/org/apache/mesos/SchedulerDriver.java
index c806a55..93aaa54 100644
--- a/src/java/src/org/apache/mesos/SchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/SchedulerDriver.java
@@ -142,4 +142,10 @@ public interface SchedulerDriver {
   Status sendFrameworkMessage(ExecutorID executorId,
                               SlaveID slaveId,
                               byte[] data);
+
+  /**
+   * Reconciliation of tasks causes the master to send status updates for tasks
+   * whose status differs from the status sent here.
+   */
+  Status reconcileTasks(Collection<TaskStatus> statuses);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 81f4ffa..1bf5d47 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -499,6 +499,11 @@ void Master::initialize()
       &StatusUpdateMessage::update,
       &StatusUpdateMessage::pid);
 
+  install<ReconcileTasksMessage>(
+      &Master::reconcileTasks,
+      &ReconcileTasksMessage::framework_id,
+      &ReconcileTasksMessage::statuses);
+
   install<ExitedExecutorMessage>(
       &Master::exitedExecutor,
       &ExitedExecutorMessage::slave_id,
@@ -1539,6 +1544,55 @@ void Master::deactivateSlave(const SlaveID& slaveId)
 }
 
 
+void Master::reconcileTasks(
+    const FrameworkID& frameworkId,
+    const std::vector<TaskStatus>& statuses)
+{
+  Framework* framework = getFramework(frameworkId);
+  if (framework == NULL) {
+    LOG(WARNING) << "Unknown framework " << frameworkId << " at " <<
from
+                 << " attempted to reconcile tasks";
+    return;
+  }
+
+  LOG(INFO) << "Performing task state reconciliation for framework "
+            << frameworkId;
+
+  // Verify expected task states and send status updates whenever expectations
+  // are not met. When:
+  //   1) Slave is unknown.*
+  //   2) Task is unknown.*
+  //   3) Task state has changed.
+  //
+  // *) TODO(nnielsen): Missing slaves and tasks are currently treated silently
+  //                    i.e. nothing is sent. To give accurate responses in
+  //                    these cases during master fail-over, we need to leverage
+  //                    the registrar.
+  foreach (const TaskStatus& status, statuses) {
+    if (!status.has_slave_id()) {
+      LOG(WARNING) << "Status from task " << status.task_id()
+                   << " does not include slave id";
+      continue;
+    }
+
+    Slave* slave = getSlave(status.slave_id());
+    if (slave != NULL) {
+      Task* task = slave->getTask(frameworkId, status.task_id());
+      if (task != NULL && task->state() != status.state()) {
+        const StatusUpdate& update = protobuf::createStatusUpdate(
+          frameworkId,
+          task->slave_id(),
+          task->task_id(),
+          task->state(),
+          "Task state changed");
+
+        statusUpdate(update, UPID());
+      }
+    }
+  }
+}
+
+
 void Master::frameworkFailoverTimeout(const FrameworkID& frameworkId,
                                       const Time& reregisteredTime)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 2d38a28..9f5e25b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -132,6 +132,10 @@ public:
   void offer(const FrameworkID& framework,
              const hashmap<SlaveID, Resources>& resources);
 
+  void reconcileTasks(
+      const FrameworkID& frameworkId,
+      const std::vector<TaskStatus>& statuses);
+
   void authenticate(const UPID& pid);
 
 protected:

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index a57765e..a5dded2 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -206,6 +206,12 @@ message LostSlaveMessage {
 }
 
 
+message ReconcileTasksMessage {
+  required FrameworkID framework_id = 1;
+  repeated TaskStatus statuses = 2;
+}
+
+
 message FrameworkErrorMessage {
   required string message = 2;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/python/native/mesos_scheduler_driver_impl.cpp
----------------------------------------------------------------------
diff --git a/src/python/native/mesos_scheduler_driver_impl.cpp b/src/python/native/mesos_scheduler_driver_impl.cpp
index b170aec..059ed5d 100644
--- a/src/python/native/mesos_scheduler_driver_impl.cpp
+++ b/src/python/native/mesos_scheduler_driver_impl.cpp
@@ -122,6 +122,10 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = {
    (PyCFunction) MesosSchedulerDriverImpl_sendFrameworkMessage,
    METH_VARARGS,
    "Send a FrameworkMessage to a slave"},
+   {"reconcileTasks",
+   (PyCFunction) MesosSchedulerDriverImpl_reconcileTasks,
+   METH_VARARGS,
+   "Master sends status updates if task status is different from last known state."},
   {NULL}  /* Sentinel */
 };
 
@@ -538,5 +542,49 @@ PyObject* MesosSchedulerDriverImpl_sendFrameworkMessage(
   return PyInt_FromLong(status); // Sets exception if creating long fails.
 }
 
+
+PyObject* MesosSchedulerDriverImpl_reconcileTasks(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args)
+{
+  if (self->driver == NULL) {
+    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL");
+    return NULL;
+  }
+
+  PyObject* statusesObj = NULL;
+  vector<TaskStatus> statuses;
+
+  if (!PyArg_ParseTuple(args, "O", &statusesObj)) {
+    return NULL;
+  }
+
+  if (!PyList_Check(statusesObj)) {
+    PyErr_Format(PyExc_Exception,
+      "Parameter 1 to reconcileTasks is not a list");
+
+    return NULL;
+  }
+
+  Py_ssize_t len = PyList_Size(statusesObj);
+  for (int i = 0; i < len; i++) {
+    PyObject* statusObj = PyList_GetItem(statusesObj, i);
+    if (statusObj == NULL) {
+      return NULL;
+    }
+
+    TaskStatus status;
+    if (!readPythonProtobuf(statusObj, &status)) {
+      PyErr_Format(PyExc_Exception,
+                   "Could not deserialize Python TaskStatus");
+      return NULL;
+    }
+    statuses.push_back(status);
+  }
+
+  Status status = self->driver->reconcileTasks(statuses);
+  return PyInt_FromLong(status);
+}
+
 } // namespace python {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/python/native/mesos_scheduler_driver_impl.hpp
----------------------------------------------------------------------
diff --git a/src/python/native/mesos_scheduler_driver_impl.hpp b/src/python/native/mesos_scheduler_driver_impl.hpp
index 83fdc18..9fb4867 100644
--- a/src/python/native/mesos_scheduler_driver_impl.hpp
+++ b/src/python/native/mesos_scheduler_driver_impl.hpp
@@ -110,6 +110,10 @@ PyObject* MesosSchedulerDriverImpl_sendFrameworkMessage(
     MesosSchedulerDriverImpl* self,
     PyObject* args);
 
+PyObject* MesosSchedulerDriverImpl_reconcileTasks(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args);
+
 }} /* namespace mesos { namespace python { */
 
 #endif /* MESOS_SCHEDULER_DRIVER_IMPL_HPP */

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index f0f3008..824b4b7 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -857,6 +857,23 @@ protected:
     }
   }
 
+  void reconcileTasks(const vector<TaskStatus>& statuses)
+  {
+    if (!connected) {
+     VLOG(1) << "Ignoring task reconciliation as master is disconnected";
+     return;
+    }
+
+    ReconcileTasksMessage message;
+    message.mutable_framework_id()->MergeFrom(framework.id());
+
+    foreach (const TaskStatus& status, statuses) {
+      message.add_statuses()->MergeFrom(status);
+    }
+
+    send(master, message);
+  }
+
 private:
   friend class mesos::MesosSchedulerDriver;
 
@@ -1244,6 +1261,23 @@ Status MesosSchedulerDriver::sendFrameworkMessage(
 }
 
 
+Status MesosSchedulerDriver::reconcileTasks(
+    const vector<TaskStatus>& statuses)
+{
+  Lock lock(&mutex);
+
+  if (status != DRIVER_RUNNING) {
+    return status;
+  }
+
+  CHECK(process != NULL);
+
+  dispatch(process, &SchedulerProcess::reconcileTasks, statuses);
+
+  return status;
+}
+
+
 Status MesosSchedulerDriver::requestResources(
     const vector<Request>& requests)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4094fc69/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index d91691c..c325c10 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -1022,3 +1022,79 @@ TEST_F(MasterTest, MasterLost)
 
   Shutdown();
 }
+
+// Test sends different state than current and expects an update with
+// the current state of task.
+//
+// TODO(nnielsen): Stubs have been left for future test, where test sends
+// expected state of non-existing task and an update with TASK_LOST should
+// be received. Also (not currently covered) if statuses are up to date,
+// nothing should happen.
+TEST_F(MasterTest, ReconcileTaskTest)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestingIsolator isolator(&exec);
+
+  Try<PID<Slave> > slave = StartSlave(&isolator);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(1, 1, 512, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.start();
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  EXPECT_EQ(true, status.get().has_slave_id());
+
+  const TaskID taskId = status.get().task_id();
+  const SlaveID slaveId = status.get().slave_id();
+
+  // If framwework has different state, current state should be reported.
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status2));
+
+  vector<TaskStatus> statuses;
+
+  TaskStatus differentStatus;
+  differentStatus.mutable_task_id()->CopyFrom(taskId);
+  differentStatus.mutable_slave_id()->CopyFrom(slaveId);
+  differentStatus.set_state(TASK_KILLED);
+
+  statuses.push_back(differentStatus);
+
+  driver.reconcileTasks(statuses);
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_RUNNING, status2.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'isolator' gets deallocated.
+}


Mime
View raw message