mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinodk...@apache.org
Subject [3/6] mesos git commit: Updated balloon framework code with flags and better resource math.
Date Mon, 13 Jun 2016 21:35:36 GMT
Updated balloon framework code with flags and better resource math.

This gives the example `balloon-framework` enough options to run
outside of the build environment:

* Adds an option for restricting the number of resources per task
  (otherwise, it will eat up an entire node).
* Adds an option for persisting the framework and launching one task
  after another.
* Adds filters for declined offers.

Review: https://reviews.apache.org/r/45604/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/61239701
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/61239701
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/61239701

Branch: refs/heads/master
Commit: 61239701dbb772d34807ca2d62dafce6cf9bc235
Parents: 0944533
Author: Joseph Wu <joseph@mesosphere.io>
Authored: Mon Jun 13 14:35:15 2016 -0700
Committer: Vinod Kone <vinodkone@gmail.com>
Committed: Mon Jun 13 14:35:15 2016 -0700

----------------------------------------------------------------------
 src/examples/balloon_framework.cpp  | 276 ++++++++++++++++++++-----------
 src/tests/balloon_framework_test.sh |   2 +-
 2 files changed, 181 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/61239701/src/examples/balloon_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/balloon_framework.cpp b/src/examples/balloon_framework.cpp
index 577f9ca..c7c59aa 100644
--- a/src/examples/balloon_framework.cpp
+++ b/src/examples/balloon_framework.cpp
@@ -14,18 +14,17 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include <assert.h>
-#include <stdlib.h>
-
-#include <sys/param.h>
+#include <glog/logging.h>
 
 #include <iostream>
 #include <string>
 #include <vector>
 
+#include <mesos/resources.hpp>
 #include <mesos/scheduler.hpp>
 
-#include <stout/numify.hpp>
+#include <stout/bytes.hpp>
+#include <stout/flags.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
@@ -33,28 +32,102 @@
 
 #include "common/protobuf_utils.hpp"
 
-#include "examples/utils.hpp"
-
 using namespace mesos;
 using namespace mesos::internal;
 
-using std::cout;
-using std::endl;
 using std::string;
 
-// The amount of memory in MB the executor itself takes.
-const static size_t EXECUTOR_MEMORY_MB = 64;
+const double CPUS_PER_TASK = 0.1;
+
+const double CPUS_PER_EXECUTOR = 0.1;
+const int32_t MEM_PER_EXECUTOR = 64;
+
+class Flags : public flags::FlagsBase
+{
+public:
+  Flags()
+  {
+    add(&master,
+        "master",
+        "Master to connect to.");
+
+    add(&task_memory_usage_limit,
+        "task_memory_usage_limit",
+        None(),
+        "Maximum size, in bytes, of the task's memory usage.\n"
+        "The task will attempt to occupy memory up until this limit.",
+        static_cast<const Bytes*>(nullptr),
+        [](const Bytes& value) -> Option<Error> {
+          if (value.megabytes() < MEM_PER_EXECUTOR) {
+            return Error(
+                "Please use a --task_memory_usage_limit greater than " +
+                stringify(MEM_PER_EXECUTOR) + " MB");
+          }
+
+          return None();
+        });
+
+    add(&task_memory,
+        "task_memory",
+        "How much memory the framework will require per task.\n"
+        "If not specified, the task(s) will use all available memory in\n"
+        "applicable offers.");
+
+    add(&build_dir,
+        "build_dir",
+        "The build directory of Mesos. If set, the framework will assume\n"
+        "that the executor, framework, and agent(s) all live on the same\n"
+        "machine.");
+
+    add(&executor_uri,
+        "executor_uri",
+        "URI the fetcher should use to get the executor.");
+
+    add(&executor_command,
+        "executor_command",
+        "The command that should be used to start the executor.\n"
+        "This will override the value set by `--build_dir`.");
+
+    add(&checkpoint,
+        "checkpoint",
+        "Whether this framework should be checkpointed.\n",
+        false);
+
+    add(&long_running,
+        "long_running",
+        "Whether this framework should launch tasks repeatedly\n"
+        "or exit after finishing a single task.",
+        false);
+  }
+
+  string master;
+  Bytes task_memory_usage_limit;
+  Bytes task_memory;
+
+  // Flags for specifying the executor binary.
+  Option<string> build_dir;
+  Option<string> executor_uri;
+  Option<string> executor_command;
+
+  bool checkpoint;
+  bool long_running;
+};
 
 
+// This scheduler starts a single executor and task which gradually
+// increases its memory footprint up to a limit.  Depending on the
+// resource limits set for the container, the framework expects the
+// executor to either finish successfully or be OOM-killed.
 class BalloonScheduler : public Scheduler
 {
 public:
   BalloonScheduler(
       const ExecutorInfo& _executor,
-      size_t _balloonLimit)
+      const Flags& _flags)
     : executor(_executor),
-      balloonLimit(_balloonLimit),
-      taskLaunched(false) {}
+      flags(_flags),
+      taskActive(false),
+      tasksLaunched(0) {}
 
   virtual ~BalloonScheduler() {}
 
@@ -80,45 +153,46 @@ public:
       SchedulerDriver* driver,
       const std::vector<Offer>& offers)
   {
+    static const Resources TASK_RESOURCES = Resources::parse(
+        "cpus:" + stringify(CPUS_PER_TASK) +
+        ";mem:" + stringify(flags.task_memory.megabytes())).get();
+
+    static const Resources EXECUTOR_RESOURCES = Resources(executor.resources());
+
     LOG(INFO) << "Resource offers received";
 
-    for (size_t i = 0; i < offers.size(); i++) {
-      const Offer& offer = offers[i];
-
-      // We just launch one task.
-      if (!taskLaunched) {
-        double mem = getScalarResource(offer, "mem");
-        assert(mem > EXECUTOR_MEMORY_MB);
-
-        std::vector<TaskInfo> tasks;
-        LOG(INFO) << "Starting the task";
-
-        TaskInfo task;
-        task.set_name("Balloon Task");
-        task.mutable_task_id()->set_value("1");
-        task.mutable_slave_id()->MergeFrom(offer.slave_id());
-        task.mutable_executor()->MergeFrom(executor);
-        task.set_data(stringify<size_t>(balloonLimit));
-
-        // Use up all the memory from the offer.
-        Resource* resource;
-        resource = task.add_resources();
-        resource->set_name("mem");
-        resource->set_type(Value::SCALAR);
-        resource->mutable_scalar()->set_value(mem - EXECUTOR_MEMORY_MB);
-
-        // And all the CPU.
-        double cpus = getScalarResource(offer, "cpus");
-        resource = task.add_resources();
-        resource->set_name("cpus");
-        resource->set_type(Value::SCALAR);
-        resource->mutable_scalar()->set_value(cpus);
-
-        tasks.push_back(task);
-        driver->launchTasks(offer.id(), tasks);
-
-        taskLaunched = true;
+    foreach (const Offer& offer, offers) {
+      Resources resources(offer.resources());
+
+      // If there is an active task, or if the offer is not
+      // big enough, reject the offer.
+      if (taskActive || !resources.flatten().contains(
+            TASK_RESOURCES + EXECUTOR_RESOURCES)) {
+        Filters filters;
+        filters.set_refuse_seconds(600);
+
+        driver->declineOffer(offer.id(), filters);
+        continue;
       }
+
+      int taskId = tasksLaunched++;
+
+      LOG(INFO) << "Starting task " << taskId;
+
+      TaskInfo task;
+      task.set_name("Balloon Task");
+      task.mutable_task_id()->set_value(stringify(taskId));
+      task.mutable_slave_id()->MergeFrom(offer.slave_id());
+      task.mutable_resources()->CopyFrom(TASK_RESOURCES);
+      task.set_data(stringify(flags.task_memory_usage_limit));
+
+      task.mutable_executor()->CopyFrom(executor);
+      task.mutable_executor()->mutable_executor_id()
+        ->set_value(stringify(taskId));
+
+      driver->launchTasks(offer.id(), {task});
+
+      taskActive = true;
     }
   }
 
@@ -141,10 +215,12 @@ public:
     if (protobuf::isTerminalState(status.state())) {
       // NOTE: We expect TASK_FAILED here. The abort here ensures the shell
       // script invoking this test, considers the test result as 'PASS'.
-      if (status.state() == TASK_FAILED) {
-        driver->abort();
-      } else {
-        driver->stop();
+      if (!flags.long_running) {
+        if (status.state() == TASK_FAILED) {
+          driver->abort();
+        } else {
+          driver->stop();
+        }
       }
     }
   }
@@ -179,69 +255,77 @@ public:
 
 private:
   const ExecutorInfo executor;
-  const size_t balloonLimit;
-  bool taskLaunched;
+  const Flags flags;
+  bool taskActive;
+  int tasksLaunched;
 };
 
 
 int main(int argc, char** argv)
 {
-  if (argc != 3) {
-    std::cerr << "Usage: " << argv[0]
-              << " <master> <balloon limit in MB>" << std::endl;
-    return -1;
-  }
+  Flags flags;
+  Try<flags::Warnings> load = flags.load("MESOS_", argc, argv);
 
-  // Verify the balloon limit.
-  Try<size_t> limit = numify<size_t>(argv[2]);
-  if (limit.isError()) {
-    std::cerr << "Balloon limit is not a valid number" << std::endl;
-    return -1;
+  if (load.isError()) {
+    EXIT(EXIT_FAILURE) << flags.usage(load.error());
   }
 
-  if (limit.get() < EXECUTOR_MEMORY_MB) {
-    std::cerr << "Please use a balloon limit bigger than "
-              << EXECUTOR_MEMORY_MB << " MB" << std::endl;
-  }
+  const Resources resources = Resources::parse(
+      "cpus:" + stringify(CPUS_PER_EXECUTOR) +
+      ";mem:" + stringify(MEM_PER_EXECUTOR)).get();
+
+  ExecutorInfo executor;
+  executor.mutable_resources()->CopyFrom(resources);
+  executor.set_name("Balloon Executor");
+  executor.set_source("balloon_test");
+
+  // Determine the command to run the executor based on three possibilities:
+  //   1) `--executor_command` was set, which overrides the below cases.
+  //   2) We are in the Mesos build directory, so the targeted executable
+  //      is actually a libtool wrapper script.
+  //   3) We have not detected the Mesos build directory, so assume the
+  //      executor is in the same directory as the framework.
+  string command;
 
   // Find this executable's directory to locate executor.
-  string uri;
-  Option<string> value = os::getenv("MESOS_BUILD_DIR");
-  if (value.isSome()) {
-    uri = path::join(value.get(), "src", "balloon-executor");
+  if (flags.executor_command.isSome()) {
+    command = flags.executor_command.get();
+  } else if (flags.build_dir.isSome()) {
+    command = path::join(
+        flags.build_dir.get(), "src", "balloon-executor");
   } else {
-    uri = path::join(
+    command = path::join(
         os::realpath(Path(argv[0]).dirname()).get(),
         "balloon-executor");
   }
 
-  ExecutorInfo executor;
-  executor.mutable_executor_id()->set_value("default");
-  executor.mutable_command()->set_value(uri);
-  executor.set_name("Balloon Executor");
-  executor.set_source("balloon_test");
+  executor.mutable_command()->set_value(command);
+
+  // Copy `--executor_uri` into the command.
+  if (flags.executor_uri.isSome()) {
+    mesos::CommandInfo::URI* uri = executor.mutable_command()->add_uris();
+    uri->set_value(flags.executor_uri.get());
+    uri->set_executable(true);
+  }
 
-  Resource* mem = executor.add_resources();
-  mem->set_name("mem");
-  mem->set_type(Value::SCALAR);
-  mem->mutable_scalar()->set_value(EXECUTOR_MEMORY_MB);
+  BalloonScheduler scheduler(executor, flags);
 
-  BalloonScheduler scheduler(executor, limit.get());
+  // Log any flag warnings (after logging is initialized by the scheduler).
+  foreach (const flags::Warning& warning, load->warnings) {
+    LOG(WARNING) << warning.message;
+  }
 
   FrameworkInfo framework;
-  framework.set_user(""); // Have Mesos fill in the current user.
+  framework.set_user(os::user().get());
   framework.set_name("Balloon Framework (C++)");
-
-  value = os::getenv("MESOS_CHECKPOINT");
-  if (value.isSome()) {
-    framework.set_checkpoint(
-        numify<bool>(value.get()).get());
-  }
+  framework.set_checkpoint(flags.checkpoint);
 
   MesosSchedulerDriver* driver;
-  value = os::getenv("MESOS_AUTHENTICATE_FRAMEWORKS");
+
+  // TODO(josephw): Refactor these into a common set of flags.
+  Option<string> value = os::getenv("MESOS_AUTHENTICATE_FRAMEWORKS");
   if (value.isSome()) {
-    cout << "Enabling authentication for the framework" << endl;
+    LOG(INFO) << "Enabling authentication for the framework";
 
     value = os::getenv("DEFAULT_PRINCIPAL");
     if (value.isNone()) {
@@ -263,12 +347,12 @@ int main(int argc, char** argv)
     credential.set_secret(value.get());
 
     driver = new MesosSchedulerDriver(
-        &scheduler, framework, argv[1], credential);
+        &scheduler, framework, flags.master, credential);
   } else {
     framework.set_principal("balloon-framework-cpp");
 
     driver = new MesosSchedulerDriver(
-        &scheduler, framework, argv[1]);
+        &scheduler, framework, flags.master);
   }
 
   int status = driver->run() == DRIVER_STOPPED ? 0 : 1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/61239701/src/tests/balloon_framework_test.sh
----------------------------------------------------------------------
diff --git a/src/tests/balloon_framework_test.sh b/src/tests/balloon_framework_test.sh
index a242f6c..e0b0b04 100755
--- a/src/tests/balloon_framework_test.sh
+++ b/src/tests/balloon_framework_test.sh
@@ -115,7 +115,7 @@ if [[ ${STATUS} -ne 0 ]]; then
 fi
 
 # The main event!
-${BALLOON_FRAMEWORK} 127.0.0.1:5432 1024
+${BALLOON_FRAMEWORK} --master=127.0.0.1:5432 --task_memory_usage_limit=1024MB --task_memory=32MB
 STATUS=${?}
 
 # Make sure the balloon framework "failed".


Mime
View raw message