mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [1/4] mesos git commit: Added a cache to the Fetcher.
Date Mon, 01 Jun 2015 13:45:59 GMT
Repository: mesos
Updated Branches:
  refs/heads/master b16999a4c -> 7aede4ad4


http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/fetcher_cache_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_cache_tests.cpp b/src/tests/fetcher_cache_tests.cpp
new file mode 100644
index 0000000..99777f8
--- /dev/null
+++ b/src/tests/fetcher_cache_tests.cpp
@@ -0,0 +1,1359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unistd.h>
+
+#include <gmock/gmock.h>
+
+#include <list>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/check.hpp>
+#include <process/clock.hpp>
+#include <process/collect.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/message.hpp>
+#include <process/owned.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+#include <process/queue.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/try.hpp>
+
+#include "common/lock.hpp"
+
+#include "master/flags.hpp"
+#include "master/master.hpp"
+
+#include "slave/constants.hpp"
+#include "slave/gc.hpp"
+#include "slave/flags.hpp"
+#include "slave/paths.hpp"
+#include "slave/slave.hpp"
+
+#include "slave/containerizer/fetcher.hpp"
+
+#include "tests/containerizer.hpp"
+#include "tests/flags.hpp"
+#include "tests/mesos.hpp"
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::MesosContainerizer;
+using mesos::internal::slave::MesosContainerizerProcess;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::FetcherProcess;
+
+using process::Future;
+using process::HttpEvent;
+using process::Owned;
+using process::PID;
+using process::Promise;
+using process::Process;
+using process::Queue;
+using process::Subprocess;
+
+using std::list;
+using std::string;
+using std::vector;
+
+using testing::_;
+using testing::DoAll;
+using testing::DoDefault;
+using testing::Eq;
+using testing::Invoke;
+using testing::InvokeWithoutArgs;
+using testing::Return;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+static const string ASSETS_DIRECTORY_NAME = "mesos-fetcher-test-assets";
+static const string COMMAND_NAME = "mesos-fetcher-test-cmd";
+static const string ARCHIVE_NAME = "mesos-fetcher-test-archive.tgz";
+static const string ARCHIVED_COMMAND_NAME = "mesos-fetcher-test-acmd";
+
+// Every task executes one of these shell scripts, which create a
+// file that includes the current task name in its name. The latter
+// is expected to be passed in as a script argument. The existence
+// of the file with that name is then used as proof that the task
+// ran successfully.
+static const string COMMAND_SCRIPT = "touch " + COMMAND_NAME + "$1";
+static const string ARCHIVED_COMMAND_SCRIPT =
+  "touch " + ARCHIVED_COMMAND_NAME + "$1";
+
+
+class FetcherCacheTest : public MesosTest
+{
+public:
+  struct Task {
+    Path runDirectory;
+    Queue<TaskStatus> statusQueue;
+  };
+
+  void setupCommandFileAsset();
+
+protected:
+  void setupArchiveAsset();
+
+  virtual void SetUp();
+  virtual void TearDown();
+
+  // Sets up the slave and starts it. Calling this late in the test
+  // instead of having it included in SetUp() gives us the opportunity
+  // to manipulate values in 'flags', first.
+  void startSlave();
+
+  // Stops the slave, deleting the containerizer, for subsequent
+  // recovery testing.
+  void stopSlave();
+
+  Task launchTask(const CommandInfo& commandInfo, const size_t taskIndex);
+
+  vector<Task> launchTasks(const vector<CommandInfo>& commandInfos);
+
+  // Waits until FetcherProcess::run() has been called for all tasks.
+  void awaitFetchContention();
+
+  string assetsDirectory;
+  string commandPath;
+  string archivePath;
+
+  slave::Flags flags;
+  MesosContainerizer* containerizer;
+  PID<Slave> slavePid;
+  SlaveID slaveId;
+  string cacheDirectory;
+  MockFetcherProcess* fetcherProcess;
+  MockScheduler scheduler;
+  MesosSchedulerDriver* driver;
+
+private:
+  Fetcher* fetcher;
+
+  FrameworkID frameworkId;
+
+  // Promises whose futures indicate that FetcherProcess::_fetch() has been
+  // called for a task with a given index.
+  vector<Owned<Promise<Nothing>>> fetchContentionWaypoints;
+};
+
+
+void FetcherCacheTest::SetUp()
+{
+  MesosTest::SetUp();
+
+  flags = CreateSlaveFlags();
+  flags.resources =
+    Some(stringify(Resources::parse("cpus:1000;mem:1000").get()));
+
+  assetsDirectory = path::join(flags.work_dir, ASSETS_DIRECTORY_NAME);
+  ASSERT_SOME(os::mkdir(assetsDirectory));
+
+  setupCommandFileAsset();
+  setupArchiveAsset();
+
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  fetcherProcess = new MockFetcherProcess();
+  fetcher = new Fetcher(Owned<FetcherProcess>(fetcherProcess));
+
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.set_name("default");
+  frameworkInfo.set_checkpoint(true);
+
+  driver = new MesosSchedulerDriver(
+    &scheduler, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(scheduler, registered(driver, _, _))
+    .Times(1);
+}
+
+
+void FetcherCacheTest::TearDown()
+{
+  driver->stop();
+  driver->join();
+  delete driver;
+
+  delete fetcher;
+
+  MesosTest::TearDown();
+}
+
+
+// TODO(bernd-mesos): Make this abstractions as generic and generally
+// available for all testing as possible.
+void FetcherCacheTest::startSlave()
+{
+  Try<MesosContainerizer*> create = MesosContainerizer::create(
+      flags, true, fetcher);
+  ASSERT_SOME(create);
+  containerizer = create.get();
+
+  Try<PID<Slave>> pid = StartSlave(containerizer, flags);
+  ASSERT_SOME(pid);
+  slavePid = pid.get();
+
+  // Obtain the slave ID.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+  AWAIT_READY(slaveRegisteredMessage);
+  slaveId = slaveRegisteredMessage.get().slave_id();
+
+  cacheDirectory =
+    slave::paths::getSlavePath(flags.fetcher_cache_dir, slaveId);
+}
+
+
+void FetcherCacheTest::stopSlave()
+{
+  Stop(slavePid);
+  delete containerizer;
+}
+
+
+void FetcherCacheTest::setupCommandFileAsset()
+{
+  commandPath = path::join(assetsDirectory, COMMAND_NAME);
+  ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT));
+
+  // Make the command file read-only, so we can discern the URI
+  // executable flag.
+  ASSERT_SOME(os::chmod(commandPath, S_IRUSR | S_IRGRP | S_IROTH));
+}
+
+
+void FetcherCacheTest::setupArchiveAsset()
+{
+  string path = path::join(assetsDirectory, ARCHIVED_COMMAND_NAME);
+  ASSERT_SOME(os::write(path, ARCHIVED_COMMAND_SCRIPT));
+
+  // Make the archived command file executable before archiving it,
+  // since the executable flag for CommandInfo::URI has no effect on
+  // what comes out of an archive.
+  ASSERT_SOME(os::chmod(path, S_IRWXU | S_IRWXG | S_IRWXO));
+
+  const string cwd = os::getcwd();
+  ASSERT_SOME(os::chdir(assetsDirectory));
+  ASSERT_SOME(os::tar(ARCHIVED_COMMAND_NAME, ARCHIVE_NAME));
+  ASSERT_SOME(os::chdir(cwd));
+  archivePath = path::join(assetsDirectory, ARCHIVE_NAME);
+
+  // Make the archive file read-only, so we can tell if it becomes
+  // executable by acccident.
+  ASSERT_SOME(os::chmod(archivePath, S_IRUSR | S_IRGRP | S_IROTH));
+}
+
+
+static string taskName(int taskIndex)
+{
+  return stringify(taskIndex);
+}
+
+
+// TODO(bernd-mesos): Use Path, not string, create Path::executable().
+static bool isExecutable(const string& path)
+{
+  Try<bool> access = os::access(path, X_OK);
+  EXPECT_SOME(access);
+  return access.isSome() && access.get();
+}
+
+
+// Create a future that indicates that the task observed by the given
+// status queue is finished.
+static Future<Nothing> awaitFinished(FetcherCacheTest::Task task)
+{
+  return task.statusQueue.get()
+    .then([=](const TaskStatus& status) -> Future<Nothing> {
+      if (status.state() == TASK_FINISHED) {
+        return Nothing();
+      }
+      return awaitFinished(task);
+  });
+}
+
+
+// Create a future that indicates that all tasks are finished.
+// TODO(bernd-mesos): Make this abstractions as generic and generally
+// available for all testing as possible.
+static Future<list<Nothing>> awaitFinished(
+    vector<FetcherCacheTest::Task> tasks)
+{
+  list<Future<Nothing>> futures;
+
+  foreach (FetcherCacheTest::Task task, tasks) {
+    futures.push_back(awaitFinished(task));
+  }
+
+  return collect(futures);
+}
+
+
+// Pushes the TaskStatus value in mock call argument #1 into the
+// given queue, which later on shall be queried by awaitFinished().
+ACTION_P(PushTaskStatus, taskStatusQueue)
+{
+  TaskStatus taskStatus = arg1;
+
+  // Input parameters of ACTION_P are const. We make a mutable copy
+  // so that we can use put().
+  Queue<TaskStatus> queue = taskStatusQueue;
+
+  queue.put(taskStatus);
+}
+
+
+// Launches a task as described by its CommandInfo and returns its sandbox
+// run directory path. Its completion will be indicated by the result of
+// awaitFinished(task), where `task` is the return value of this method..
+// TODO(bernd-mesos): Make this abstractions as generic and generally
+// available for all testing as possible.
+FetcherCacheTest::Task FetcherCacheTest::launchTask(
+    const CommandInfo& commandInfo,
+    const size_t taskIndex)
+{
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(scheduler, resourceOffers(driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(DeclineOffers());
+
+  offers.await(Seconds(15));
+  CHECK_READY(offers) << "Failed to wait for resource offers";
+
+  EXPECT_NE(0u, offers.get().size());
+  const Offer offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name(taskName(taskIndex));
+  task.mutable_task_id()->set_value(taskName(taskIndex));
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+
+  // We don't care about resources in these tests. This small amount
+  // will always succeed.
+  task.mutable_resources()->CopyFrom(
+      Resources::parse("cpus:1;mem:1").get());
+
+  task.mutable_command()->CopyFrom(commandInfo);
+
+  // Since we are always using a command executor here, the executor
+  // ID can be determined by copying the task ID.
+  ExecutorID executorId;
+  executorId.set_value(task.task_id().value());
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Queue<TaskStatus> taskStatusQueue;
+
+  EXPECT_CALL(scheduler, statusUpdate(driver, _))
+    .WillRepeatedly(PushTaskStatus(taskStatusQueue));
+
+  driver->launchTasks(offer.id(), tasks);
+
+  const Path path = Path(slave::paths::getExecutorLatestRunPath(
+      flags.work_dir,
+      slaveId,
+      offer.framework_id(),
+      executorId));
+
+  return Task {path, taskStatusQueue};
+}
+
+
+// Pushes the task status value of a task status update callback
+// into the task status queue that corresponds to the task index/ID
+// for which the status update is being reported. 'tasks' must be a
+// 'vector<Task>>', where every slot index corresponds to a task
+// index/ID.
+// TODO(bernd-mesos): Make this abstractions as generic and generally
+// available for all testing as possible.
+ACTION_TEMPLATE(PushIndexedTaskStatus,
+                HAS_1_TEMPLATE_PARAMS(int, k),
+                AND_1_VALUE_PARAMS(tasks))
+{
+  TaskStatus taskStatus = ::std::tr1::get<k>(args);
+  Try<int> taskId = numify<int>(taskStatus.task_id().value());
+  ASSERT_SOME(taskId);
+  Queue<TaskStatus> queue = (tasks)[taskId.get()].statusQueue;
+  queue.put(taskStatus);
+}
+
+
+// Satisfies the first promise in the list that is not satisfied yet.
+ACTION_P(SatisfyOne, promises)
+{
+  foreach (const Owned<Promise<Nothing>>& promise, *promises) {
+    if (promise->future().isPending()) {
+      promise->set(Nothing());
+      return;
+    }
+  }
+
+  FAIL() << "Tried to call FetcherProcess::_fetch() "
+         << "for more tasks than launched";
+}
+
+
+// Launches the tasks described by the given CommandInfo and returns a
+// vector holding the run directory paths. All these tasks run
+// concurrently. Their completion will be indicated by the result of
+// awaitFinished(tasks), where `tasks` is the return value of this
+// method.
+// TODO(bernd-mesos): Make this abstractions as generic and generally
+// available for all testing as possible.
+vector<FetcherCacheTest::Task> FetcherCacheTest::launchTasks(
+    const vector<CommandInfo>& commandInfos)
+{
+  vector<FetcherCacheTest::Task> result;
+
+  // When _fetch() is called, notify us by satisfying a promise that
+  // a task has passed the code stretch in which it competes for cache
+  // entries.
+  EXPECT_CALL(*fetcherProcess, _fetch(_, _, _, _, _, _, _))
+    .WillRepeatedly(
+        DoAll(SatisfyOne(&fetchContentionWaypoints),
+              Invoke(fetcherProcess, &MockFetcherProcess::unmocked__fetch)));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(scheduler, resourceOffers(driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(DeclineOffers());
+
+  offers.await(Seconds(15));
+  CHECK_READY(offers) << "Failed to wait for resource offers";
+
+  EXPECT_NE(0u, offers.get().size());
+  const Offer offer = offers.get()[0];
+
+  vector<TaskInfo> tasks;
+  foreach (const CommandInfo& commandInfo, commandInfos) {
+    size_t taskIndex = tasks.size();
+
+    // Grabbing the framework ID from somewhere. It should not matter
+    // if this happens several times, as we expect the framework ID to
+    // remain the same.
+    frameworkId = offer.framework_id();
+
+    TaskInfo task;
+    task.set_name(taskName(taskIndex));
+    task.mutable_task_id()->set_value(taskName(taskIndex));
+    task.mutable_slave_id()->CopyFrom(offer.slave_id());
+
+    // We don't care about resources in these tests. This small amount
+    // will always succeed.
+    task.mutable_resources()->CopyFrom(
+        Resources::parse("cpus:1;mem:1").get());
+
+    task.mutable_command()->CopyFrom(commandInfo);
+
+    tasks.push_back(task);
+
+    // Since we are always using a command executor here, the executor
+    // ID can be determined by copying the task ID.
+    ExecutorID executorId;
+    executorId.set_value(task.task_id().value());
+
+    Path runDirectory = Path(slave::paths::getExecutorLatestRunPath(
+        flags.work_dir,
+        slaveId,
+        frameworkId,
+        executorId));
+
+    // Grabbing task status futures to wait for. We make a queue of futures
+    // for each task. We can then wait until the front element indicates
+    // status TASK_FINISHED. We use a queue, because we never know which
+    // status update will be the one we have been waiting for.
+    Queue<TaskStatus> taskStatusQueue;
+
+    result.push_back(Task {runDirectory, taskStatusQueue});
+
+    EXPECT_CALL(scheduler, statusUpdate(driver, _))
+      .WillRepeatedly(PushIndexedTaskStatus<1>(result));
+
+    auto waypoint = Owned<Promise<Nothing>>(new Promise<Nothing>());
+    fetchContentionWaypoints.push_back(waypoint);
+  }
+
+  driver->launchTasks(offer.id(), tasks);
+
+  return result;
+}
+
+
+// Ensure that FetcherProcess::_fetch() has been called for each task,
+// which means that all tasks are competing for downloading the same URIs.
+void FetcherCacheTest::awaitFetchContention()
+{
+  foreach (const Owned<Promise<Nothing>>& waypoint, fetchContentionWaypoints) {
+    AWAIT(waypoint->future());
+  }
+}
+
+
+// Tests fetching from the local asset directory without cache. This
+// gives us a baseline for the following tests and lets us debug our
+// test infrastructure without extra complications.
+TEST_F(FetcherCacheTest, LocalUncached)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(commandPath);
+    uri.set_executable(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    EXPECT_EQ(0u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(0u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+    const string path = path::join(task.runDirectory.value, COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+  }
+}
+
+
+// Tests fetching from the local asset directory with simple caching.
+// Only one download must occur. Fetching is serialized, to cover
+// code areas without overlapping/concurrent fetch attempts.
+TEST_F(FetcherCacheTest, LocalCached)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(commandPath);
+    uri.set_executable(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    const string path = path::join(task.runDirectory.value, COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(1u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+  }
+}
+
+
+// Tests falling back on bypassing the cache when fetching the download
+// size of a URI that is supposed to be cached fails.
+TEST_F(FetcherCacheTest, CachedFallback)
+{
+  startSlave();
+  driver->start();
+
+  // Make sure the content-length request fails.
+  ASSERT_SOME(os::rm(commandPath));
+
+  CommandInfo::URI uri;
+  uri.set_value(commandPath);
+  uri.set_executable(true);
+  uri.set_cache(true);
+
+  CommandInfo commandInfo;
+  commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(0));
+  commandInfo.add_uris()->CopyFrom(uri);
+
+  // Bring back the asset just before running mesos-fetcher to fetch it.
+  Future<FetcherInfo> fetcherInfo;
+  EXPECT_CALL(*fetcherProcess, run(_, _, _))
+    .WillOnce(DoAll(FutureArg<1>(&fetcherInfo),
+                    InvokeWithoutArgs(this,
+                                      &FetcherCacheTest::setupCommandFileAsset),
+                    Invoke(fetcherProcess,
+                           &MockFetcherProcess::unmocked_run)));
+
+  const Task task = launchTask(commandInfo, 0);
+
+  AWAIT_READY(awaitFinished(task));
+
+  const string path = path::join(task.runDirectory.value, COMMAND_NAME);
+  EXPECT_TRUE(isExecutable(path));
+  EXPECT_TRUE(os::exists(path + taskName(0)));
+
+  AWAIT_READY(fetcherInfo);
+
+  EXPECT_EQ(1, fetcherInfo.get().items_size());
+  EXPECT_EQ(FetcherInfo::Item::BYPASS_CACHE,
+            fetcherInfo.get().items(0).action());
+
+  EXPECT_EQ(0u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(0u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+}
+
+
+// Tests archive extraction without caching as a baseline for the
+// subsequent test below.
+TEST_F(FetcherCacheTest, LocalUncachedExtract)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(archivePath);
+    uri.set_extract(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    EXPECT_TRUE(os::exists(
+        path::join(task.runDirectory.value, ARCHIVE_NAME)));
+    EXPECT_FALSE(isExecutable(
+        path::join(task.runDirectory.value, ARCHIVE_NAME)));
+
+    const string path =
+      path::join(task.runDirectory.value, ARCHIVED_COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(0u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(0u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+  }
+}
+
+
+// Tests archive extraction in combination with caching.
+TEST_F(FetcherCacheTest, LocalCachedExtract)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(archivePath);
+    uri.set_extract(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    EXPECT_FALSE(os::exists(
+        path::join(task.runDirectory.value, ARCHIVE_NAME)));
+
+    const string path =
+      path::join(task.runDirectory.value, ARCHIVED_COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(1u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+  }
+}
+
+
+class FetcherCacheHttpTest : public FetcherCacheTest
+{
+public:
+  // A minimal HTTP server (not intended as an actor) just reusing what
+  // is already implemented somewhere to serve some HTTP requests for
+  // file downloads. Plus counting how many requests are made. Plus the
+  // ability to pause answering requests, stalling them.
+  class HttpServer : public Process<HttpServer>
+  {
+  public:
+    HttpServer(FetcherCacheHttpTest* test)
+      : countRequests(0),
+        countCommandRequests(0),
+        countArchiveRequests(0)
+    {
+      provide(COMMAND_NAME, test->commandPath);
+      provide(ARCHIVE_NAME, test->archivePath);
+
+      spawn(this);
+    }
+
+    string url()
+    {
+      return "http://127.0.0.1:" +
+             stringify(self().address.port) +
+             "/" + self().id + "/";
+    }
+
+    // Stalls the execution of HTTP requests inside visit().
+    void pause()
+    {
+      mutex.lock();
+    }
+
+    void resume()
+    {
+      mutex.unlock();
+    }
+
+    virtual void visit(const HttpEvent& event)
+    {
+      std::lock_guard<std::mutex> lock(mutex);
+
+      countRequests++;
+
+      if (strings::contains(event.request->path, COMMAND_NAME)) {
+        countCommandRequests++;
+      }
+
+      if (strings::contains(event.request->path, ARCHIVE_NAME)) {
+        countArchiveRequests++;
+      }
+
+      ProcessBase::visit(event);
+    }
+
+    void resetCounts()
+    {
+      countRequests = 0;
+      countCommandRequests = 0;
+      countArchiveRequests = 0;
+    }
+
+    size_t countRequests;
+    size_t countCommandRequests;
+    size_t countArchiveRequests;
+
+  private:
+    std::mutex mutex;
+  };
+
+
+  virtual void SetUp()
+  {
+    FetcherCacheTest::SetUp();
+
+    httpServer = new HttpServer(this);
+  }
+
+  virtual void TearDown()
+  {
+    terminate(httpServer);
+    wait(httpServer);
+    delete httpServer;
+
+    FetcherCacheTest::TearDown();
+  }
+
+  HttpServer* httpServer;
+};
+
+
+// Tests fetching via HTTP with caching. Only one download must
+// occur. Fetching is serialized, to cover code areas without
+// overlapping/concurrent fetch attempts.
+TEST_F(FetcherCacheHttpTest, HttpCachedSerialized)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(httpServer->url() + COMMAND_NAME);
+    uri.set_executable(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    const string path =
+      path::join(task.runDirectory.value, COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(1u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+    // 2 requests: 1 for content-length, 1 for download.
+    EXPECT_EQ(2u, httpServer->countCommandRequests);
+  }
+}
+
+
+// Tests multiple concurrent fetching efforts that require some
+// concurrency control. One task must "win" and perform the size
+// and download request for the URI alone. The others must reuse
+// the result.
+TEST_F(FetcherCacheHttpTest, HttpCachedConcurrent)
+{
+  startSlave();
+  driver->start();
+
+  // Causes fetch contention. No task can run yet until resume().
+  httpServer->pause();
+
+  vector<CommandInfo> commandInfos;
+  const size_t countTasks = 5;
+
+  for (size_t i = 0; i < countTasks; i++) {
+    CommandInfo::URI uri0;
+    uri0.set_value(httpServer->url() + COMMAND_NAME);
+    uri0.set_executable(true);
+    uri0.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri0);
+
+    // Not always caching this URI causes that it will be downloaded
+    // some of the time. Thus we exercise code paths that eagerly fetch
+    // new assets while waiting for pending downloads of cached assets
+    // as well as code paths where no downloading occurs at all.
+    if (i % 2 == 1) {
+      CommandInfo::URI uri1;
+      uri1.set_value(httpServer->url() + ARCHIVE_NAME);
+      commandInfo.add_uris()->CopyFrom(uri1);
+    }
+
+    commandInfos.push_back(commandInfo);
+  }
+
+  vector<Task> tasks = launchTasks(commandInfos);
+
+  CHECK_EQ(countTasks, tasks.size());
+
+  // Given pausing the HTTP server, this proves that fetch contention
+  // has happened. All tasks have passed the point where it occurs,
+  // but they are not running yet.
+  awaitFetchContention();
+
+  // Now let the tasks run.
+  httpServer->resume();
+
+  AWAIT_READY(awaitFinished(tasks));
+
+  EXPECT_EQ(1u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+  // command content-length requests: 1
+  // command downloads: 1
+  // archive downloads: 2
+  EXPECT_EQ(2u, httpServer->countCommandRequests);
+  EXPECT_EQ(2u, httpServer->countArchiveRequests);
+
+  for (size_t i = 0; i < countTasks; i++) {
+    EXPECT_EQ(i % 2 == 1, os::exists(
+        path::join(tasks[i].runDirectory.value, ARCHIVE_NAME)));
+    EXPECT_TRUE(isExecutable(
+        path::join(tasks[i].runDirectory.value, COMMAND_NAME)));
+    EXPECT_TRUE(os::exists(
+        path::join(tasks[i].runDirectory.value, COMMAND_NAME + taskName(i))));
+  }
+}
+
+
+// Tests using multiple URIs per command, variations of caching,
+// setting the executable flag, and archive extraction.
+TEST_F(FetcherCacheHttpTest, HttpMixed)
+{
+  startSlave();
+  driver->start();
+
+  // Causes fetch contention. No task can run yet until resume().
+  httpServer->pause();
+
+  vector<CommandInfo> commandInfos;
+
+  // Task 0.
+
+  CommandInfo::URI uri00;
+  uri00.set_value(httpServer->url() + ARCHIVE_NAME);
+  uri00.set_cache(true);
+  uri00.set_extract(false);
+  uri00.set_executable(false);
+
+  CommandInfo::URI uri01;
+  uri01.set_value(httpServer->url() + COMMAND_NAME);
+  uri01.set_extract(false);
+  uri01.set_executable(true);
+
+  CommandInfo commandInfo0;
+  commandInfo0.set_value("./" + COMMAND_NAME + " " + taskName(0));
+  commandInfo0.add_uris()->CopyFrom(uri00);
+  commandInfo0.add_uris()->CopyFrom(uri01);
+  commandInfos.push_back(commandInfo0);
+
+  // Task 1.
+
+  CommandInfo::URI uri10;
+  uri10.set_value(httpServer->url() + ARCHIVE_NAME);
+  uri10.set_extract(true);
+  uri10.set_executable(false);
+
+  CommandInfo::URI uri11;
+  uri11.set_value(httpServer->url() + COMMAND_NAME);
+  uri11.set_extract(true);
+  uri11.set_executable(false);
+
+  CommandInfo commandInfo1;
+  commandInfo1.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(1));
+  commandInfo1.add_uris()->CopyFrom(uri10);
+  commandInfo1.add_uris()->CopyFrom(uri11);
+  commandInfos.push_back(commandInfo1);
+
+  // Task 2.
+
+  CommandInfo::URI uri20;
+  uri20.set_value(httpServer->url() + ARCHIVE_NAME);
+  uri20.set_cache(true);
+  uri20.set_extract(true);
+  uri20.set_executable(false);
+
+  CommandInfo::URI uri21;
+  uri21.set_value(httpServer->url() + COMMAND_NAME);
+  uri21.set_extract(false);
+  uri21.set_executable(false);
+
+  CommandInfo commandInfo2;
+  commandInfo2.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(2));
+  commandInfo2.add_uris()->CopyFrom(uri20);
+  commandInfo2.add_uris()->CopyFrom(uri21);
+  commandInfos.push_back(commandInfo2);
+
+  vector<Task> tasks = launchTasks(commandInfos);
+
+  CHECK_EQ(3u, tasks.size());
+
+  // Given pausing the HTTP server, this proves that fetch contention
+  // has happened. All tasks have passed the point where it occurs,
+  // but they are not running yet.
+  awaitFetchContention();
+
+  // Now let the tasks run.
+  httpServer->resume();
+
+  AWAIT_READY(awaitFinished(tasks));
+
+  EXPECT_EQ(1u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+  // command content-length requests: 0
+  // command downloads: 3
+  // archive content-length requests: 1
+  // archive downloads: 2
+  EXPECT_EQ(3u, httpServer->countCommandRequests);
+  EXPECT_EQ(3u, httpServer->countArchiveRequests);
+
+  // Task 0.
+
+  EXPECT_FALSE(isExecutable(
+      path::join(tasks[0].runDirectory.value, ARCHIVE_NAME)));
+  EXPECT_FALSE(os::exists(
+      path::join(tasks[0].runDirectory.value, ARCHIVED_COMMAND_NAME)));
+
+  EXPECT_TRUE(isExecutable(
+      path::join(tasks[0].runDirectory.value, COMMAND_NAME)));
+  EXPECT_TRUE(os::exists(
+      path::join(tasks[0].runDirectory.value, COMMAND_NAME + taskName(0))));
+
+  // Task 1.
+
+  EXPECT_FALSE(isExecutable(
+      path::join(tasks[1].runDirectory.value, ARCHIVE_NAME)));
+  EXPECT_TRUE(isExecutable(
+      path::join(tasks[1].runDirectory.value, ARCHIVED_COMMAND_NAME)));
+  EXPECT_TRUE(os::exists(path::join(
+      tasks[1].runDirectory.value, ARCHIVED_COMMAND_NAME + taskName(1))));
+
+  EXPECT_FALSE(isExecutable(
+      path::join(tasks[1].runDirectory.value, COMMAND_NAME)));
+
+  // Task 2.
+
+  EXPECT_FALSE(os::exists(
+      path::join(tasks[2].runDirectory.value, ARCHIVE_NAME)));
+  EXPECT_TRUE(isExecutable(
+      path::join(tasks[2].runDirectory.value, ARCHIVED_COMMAND_NAME)));
+  EXPECT_TRUE(os::exists(path::join(
+      tasks[2].runDirectory.value, ARCHIVED_COMMAND_NAME + taskName(2))));
+
+  EXPECT_FALSE(isExecutable(
+      path::join(tasks[2].runDirectory.value, COMMAND_NAME)));
+}
+
+
+// Tests slave recovery of the fetcher cache. The cache must be
+// wiped clean on recovery, causing renewed downloads.
+TEST_F(FetcherCacheHttpTest, HttpCachedRecovery)
+{
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(httpServer->url() + COMMAND_NAME);
+    uri.set_executable(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    const string path = path::join(task.runDirectory.value, COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(1u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+    // content-length requests: 1
+    // downloads: 1
+    EXPECT_EQ(2u, httpServer->countCommandRequests);
+  }
+
+  stopSlave();
+
+  // Start over.
+  httpServer->resetCounts();
+
+  // Don't reuse the old fetcher, which has stale state after
+  // stopping the slave.
+  Fetcher fetcher2;
+
+  Try<MesosContainerizer*> c =
+    MesosContainerizer::create(flags, true, &fetcher2);
+  CHECK_SOME(c);
+  containerizer = c.get();
+
+  // Set up so we can wait until the new slave updates the container's
+  // resources (this occurs after the executor has re-registered).
+  Future<Nothing> update =
+    FUTURE_DISPATCH(_, &MesosContainerizerProcess::update);
+
+  Try<PID<Slave>> pid = StartSlave(containerizer, flags);
+  CHECK_SOME(pid);
+  slavePid = pid.get();
+
+  // Wait until the containerizer is updated.
+  AWAIT_READY(update);
+
+  // Recovery must have cleaned the cache by now.
+  EXPECT_FALSE(os::exists(cacheDirectory));
+
+  // Repeat of the above to see if it works the same.
+  for (size_t i = 0; i < 3; i++) {
+    CommandInfo::URI uri;
+    uri.set_value(httpServer->url() + COMMAND_NAME);
+    uri.set_executable(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i));
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    const string path =
+      path::join(task.runDirectory.value, COMMAND_NAME);
+    EXPECT_TRUE(isExecutable(path));
+    EXPECT_TRUE(os::exists(path + taskName(i)));
+
+    EXPECT_EQ(1u, fetcherProcess->cacheSize());
+    EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+    EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+    // content-length requests: 1
+    // downloads: 1
+    EXPECT_EQ(2u, httpServer->countCommandRequests);
+  }
+}
+
+
+// Tests cache eviction. Limits the available cache space then fetches
+// more task scripts than fit into the cache and runs them all. We
+// observe how the number of cache files rises and then stays constant.
+TEST_F(FetcherCacheTest, SimpleEviction)
+{
+  const size_t countCacheEntries = 3;
+
+  // Let only the first 'countCacheEntries' downloads fit in the cache.
+  flags.fetcher_cache_size = COMMAND_SCRIPT.size() * countCacheEntries;
+
+  startSlave();
+  driver->start();
+
+  for (size_t i = 0; i < countCacheEntries + 2; i++) {
+    string commandFilename = "cmd" + stringify(i);
+    string command = commandFilename + " " + taskName(i);
+
+    commandPath = path::join(assetsDirectory, commandFilename);
+    ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT));
+
+    CommandInfo::URI uri;
+    uri.set_value(commandPath);
+    uri.set_executable(true);
+    uri.set_cache(true);
+
+    CommandInfo commandInfo;
+    commandInfo.set_value("./" + command);
+    commandInfo.add_uris()->CopyFrom(uri);
+
+    const Task task = launchTask(commandInfo, i);
+
+    AWAIT_READY(awaitFinished(task));
+
+    // Check that the task succeeded.
+    EXPECT_TRUE(isExecutable(
+        path::join(task.runDirectory.value, commandFilename)));
+    EXPECT_TRUE(os::exists(
+        path::join(task.runDirectory.value, COMMAND_NAME + taskName(i))));
+
+    if (i < countCacheEntries) {
+      EXPECT_EQ(i + 1, fetcherProcess->cacheSize());
+      EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+      EXPECT_EQ(i+1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+    } else {
+      EXPECT_EQ(countCacheEntries, fetcherProcess->cacheSize());
+      EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+      EXPECT_EQ(countCacheEntries,
+                fetcherProcess->cacheFiles(slaveId, flags).get().size());
+    }
+  }
+}
+
+
+// Tests cache eviction fallback to bypassing the cache. A first task
+// runs normally. Then a second succeeds using eviction. Then a third
+// task fails to evict, but still gets executed bypassing the cache.
+TEST_F(FetcherCacheTest, FallbackFromEviction)
+{
+  // The size by which every task's URI download is going to be larger
+  // than the previous one.
+  const size_t growth = 10;
+
+  // Let only the first two downloads fit into the cache, one at a time,
+  // the second evicting the first. The third file won't fit any more,
+  // being larger than the entire cache.
+  flags.fetcher_cache_size = COMMAND_SCRIPT.size() + growth;
+
+  startSlave();
+  driver->start();
+
+  // We'll run 3 tasks and these are the task completion futures to wait
+  // for each time.
+  Future<FetcherInfo> fetcherInfo0;
+  Future<FetcherInfo> fetcherInfo1;
+  Future<FetcherInfo> fetcherInfo2;
+  EXPECT_CALL(*fetcherProcess, run(_, _, _))
+    .WillOnce(DoAll(FutureArg<1>(&fetcherInfo0),
+                    Invoke(fetcherProcess,
+                           &MockFetcherProcess::unmocked_run)))
+    .WillOnce(DoAll(FutureArg<1>(&fetcherInfo1),
+                    Invoke(fetcherProcess,
+                           &MockFetcherProcess::unmocked_run)))
+    .WillOnce(DoAll(FutureArg<1>(&fetcherInfo2),
+                    Invoke(fetcherProcess,
+                           &MockFetcherProcess::unmocked_run)));
+
+
+  // Task 0:
+
+  const string commandFilename0 = "cmd0";
+  const string command0 = commandFilename0 + " " + taskName(0);
+
+  commandPath = path::join(assetsDirectory, commandFilename0);
+
+  // Write the command into the script that gets fetched.
+  ASSERT_SOME(os::write(commandPath, COMMAND_SCRIPT));
+
+  CommandInfo::URI uri0;
+  uri0.set_value(commandPath);
+  uri0.set_executable(true);
+  uri0.set_cache(true);
+
+  CommandInfo commandInfo0;
+  commandInfo0.set_value("./" + command0);
+  commandInfo0.add_uris()->CopyFrom(uri0);
+
+  const Task task0 = launchTask(commandInfo0, 0);
+
+  AWAIT_READY(awaitFinished(task0));
+
+  // Check that the task succeeded.
+  EXPECT_TRUE(isExecutable(
+      path::join(task0.runDirectory.value, commandFilename0)));
+  EXPECT_TRUE(os::exists(
+      path::join(task0.runDirectory.value, COMMAND_NAME + taskName(0))));
+
+  AWAIT_READY(fetcherInfo0);
+
+  EXPECT_EQ(1, fetcherInfo0.get().items_size());
+  EXPECT_EQ(FetcherInfo::Item::DOWNLOAD_AND_CACHE,
+            fetcherInfo0.get().items(0).action());
+
+  // We have put a file of size 'COMMAND_SCRIPT.size()' in the cache
+  // with space 'COMMAND_SCRIPT.size() + growth'. So we must have 'growth'
+  // space left.
+  CHECK_EQ(Bytes(growth), fetcherProcess->availableCacheSpace());
+
+  EXPECT_EQ(1u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+
+  // Task 1:
+
+  const string commandFilename1 = "cmd1";
+  const string command1 = commandFilename1 + " " + taskName(1);
+
+  commandPath = path::join(assetsDirectory, commandFilename1);
+
+  // Write the command into the script that gets fetched. Add 'growth'
+  // extra characters so the cache will fill up to the last byte.
+  ASSERT_SOME(os::write(
+      commandPath,
+      COMMAND_SCRIPT + std::string(growth, '\n')));
+
+  CommandInfo::URI uri1;
+  uri1.set_value(commandPath);
+  uri1.set_executable(true);
+  uri1.set_cache(true);
+
+  CommandInfo commandInfo1;
+  commandInfo1.set_value("./" + command1);
+  commandInfo1.add_uris()->CopyFrom(uri1);
+
+  const Task task1 = launchTask(commandInfo1, 1);
+
+  AWAIT_READY(awaitFinished(task1));
+
+  // Check that the task succeeded.
+  EXPECT_TRUE(isExecutable(
+      path::join(task1.runDirectory.value, commandFilename1)));
+  EXPECT_TRUE(os::exists(
+      path::join(task1.runDirectory.value, COMMAND_NAME + taskName(1))));
+
+  AWAIT_READY(fetcherInfo1);
+
+  EXPECT_EQ(1, fetcherInfo1.get().items_size());
+  EXPECT_EQ(FetcherInfo::Item::DOWNLOAD_AND_CACHE,
+            fetcherInfo1.get().items(0).action());
+
+  // The cache must now be full.
+  CHECK_EQ(Bytes(0u), fetcherProcess->availableCacheSpace());
+
+  EXPECT_EQ(1u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+
+
+  // Task 2:
+
+  const string commandFilename2 = "cmd2";
+  const string command2 = commandFilename2 + " " + taskName(2);
+
+  commandPath = path::join(assetsDirectory, commandFilename2);
+
+  // Write the command into the script that gets fetched. Add
+  // '2 * growth' now. Thus the file will be so big that it will not
+  // fit into the cache any more.
+  ASSERT_SOME(os::write(
+      commandPath,
+      COMMAND_SCRIPT + std::string(2 * growth, '\n')));
+
+  CommandInfo::URI uri2;
+  uri2.set_value(commandPath);
+  uri2.set_executable(true);
+  uri2.set_cache(true);
+
+  CommandInfo commandInfo2;
+  commandInfo2.set_value("./" + command2);
+  commandInfo2.add_uris()->CopyFrom(uri2);
+
+  const Task task2 = launchTask(commandInfo2, 2);
+
+  AWAIT_READY(awaitFinished(task2));
+
+  // Check that the task succeeded.
+  EXPECT_TRUE(isExecutable(
+      path::join(task2.runDirectory.value, commandFilename2)));
+  EXPECT_TRUE(os::exists(
+      path::join(task2.runDirectory.value, COMMAND_NAME + taskName(2))));
+
+  AWAIT_READY(fetcherInfo2);
+
+  EXPECT_EQ(1, fetcherInfo2.get().items_size());
+  EXPECT_EQ(FetcherInfo::Item::BYPASS_CACHE,
+            fetcherInfo2.get().items(0).action());
+
+  EXPECT_EQ(1u, fetcherProcess->cacheSize());
+  EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags));
+  EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size());
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index 4549e6a..361d918 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -48,314 +48,154 @@
 #include "tests/mesos.hpp"
 #include "tests/utils.hpp"
 
+using namespace mesos::slave;
+
 using namespace process;
 
-using process::Subprocess;
-using process::Future;
+using mesos::fetcher::FetcherInfo;
 
 using mesos::internal::slave::Fetcher;
 
-using std::string;
+using process::Subprocess;
+using process::Future;
+
 using std::map;
+using std::string;
 
-using mesos::fetcher::FetcherInfo;
 
 namespace mesos {
 namespace internal {
 namespace tests {
 
-class FetcherEnvironmentTest : public ::testing::Test {};
+class FetcherTest : public TemporaryDirectoryTest {};
 
 
-TEST_F(FetcherEnvironmentTest, Simple)
+TEST_F(FetcherTest, FileURI)
 {
-  CommandInfo commandInfo;
-  CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(false);
+  string fromDir = path::join(os::getcwd(), "from");
+  ASSERT_SOME(os::mkdir(fromDir));
+  string testFile = path::join(fromDir, "test");
+  EXPECT_SOME(os::write(testFile, "data"));
 
-  string directory = "/tmp/directory";
-  Option<string> user = "user";
+  string localFile = path::join(os::getcwd(), "test");
+  EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
-  flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "/tmp/hadoop";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
-
-  EXPECT_EQ(2u, environment.size());
-
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
-
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
-
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
-
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
-}
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
-TEST_F(FetcherEnvironmentTest, MultipleURIs)
-{
   CommandInfo commandInfo;
-  CommandInfo::URI uri;
-  uri.set_value("hdfs:///uri1");
-  uri.set_executable(false);
-  commandInfo.add_uris()->MergeFrom(uri);
-  uri.set_value("hdfs:///uri2");
-  uri.set_executable(true);
-  commandInfo.add_uris()->MergeFrom(uri);
-
-  string directory = "/tmp/directory";
-  Option<string> user("user");
-
-  slave::Flags flags;
-  flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "/tmp/hadoop";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
-
-  EXPECT_EQ(2u, environment.size());
-
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+  CommandInfo::URI* uri = commandInfo.add_uris();
+  uri->set_value("file://" + testFile);
 
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_READY(fetch);
 
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  EXPECT_TRUE(os::exists(localFile));
 }
 
 
-TEST_F(FetcherEnvironmentTest, NoUser)
+// Negative test: invalid user name. Copied from FileTest, so this
+// normally would succeed, but here a bogus user name is specified.
+// So we check for fetch failure.
+TEST_F(FetcherTest, InvalidUser)
 {
-  CommandInfo commandInfo;
-  CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(false);
+  string fromDir = path::join(os::getcwd(), "from");
+  ASSERT_SOME(os::mkdir(fromDir));
+  string testFile = path::join(fromDir, "test");
+  EXPECT_SOME(os::write(testFile, "data"));
 
-  string directory = "/tmp/directory";
+  string localFile = path::join(os::getcwd(), "test");
+  EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
   flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "/tmp/hadoop";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, None(), flags);
-
-  EXPECT_EQ(2u, environment.size());
-
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
-
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
-
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
-
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_FALSE(fetcherInfo.get().has_user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
-}
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
-TEST_F(FetcherEnvironmentTest, EmptyHadoop)
-{
   CommandInfo commandInfo;
-  CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(false);
+  commandInfo.set_user(UUID::random().toString());
 
-  string directory = "/tmp/directory";
-  Option<string> user = "user";
-
-  slave::Flags flags;
-  flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
+  CommandInfo::URI* uri = commandInfo.add_uris();
+  uri->set_value("file://" + testFile);
 
-  EXPECT_EQ(0u, environment.count("HADOOP_HOME"));
-  EXPECT_EQ(1u, environment.size());
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_FAILED(fetch);
 
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
+  // See FetcherProcess::fetch(), the message must mention "chown" in
+  // this case.
+  EXPECT_TRUE(strings::contains(fetch.failure(), "chown"));
 
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  EXPECT_FALSE(os::exists(localFile));
 }
 
 
-TEST_F(FetcherEnvironmentTest, NoHadoop)
+// Negative test: URI leading to non-existing file. Copied from FileTest,
+// but here the resource is missing. So we check for fetch failure.
+TEST_F(FetcherTest, NonExistingFile)
 {
-  CommandInfo commandInfo;
-  CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(false);
-
-  string directory = "/tmp/directory";
-  Option<string> user = "user";
+  string fromDir = path::join(os::getcwd(), "from");
+  ASSERT_SOME(os::mkdir(fromDir));
+  string testFile = path::join(fromDir, "nonExistingFile");
 
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
   flags.frameworks_home = "/tmp/frameworks";
 
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
-
-  EXPECT_EQ(0u, environment.count("HADOOP_HOME"));
-  EXPECT_EQ(1u, environment.size());
-
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
-
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
-
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
-}
-
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
-TEST_F(FetcherEnvironmentTest, NoExtractNoExecutable)
-{
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(false);
-  uri->set_extract(false);
-
-  string directory = "/tmp/directory";
-  Option<string> user = "user";
-
-  slave::Flags flags;
-  flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "/tmp/hadoop";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
-
-  EXPECT_EQ(2u, environment.size());
-
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
+  uri->set_value("file://" + testFile);
 
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_FAILED(fetch);
 
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
+  // See FetcherProcess::run().
+  EXPECT_TRUE(strings::contains(fetch.failure(), "Failed to fetch"));
 }
 
 
-TEST_F(FetcherEnvironmentTest, NoExtractExecutable)
+// Negative test: malformed URI, missing path.
+TEST_F(FetcherTest, MalformedURI)
 {
-  CommandInfo commandInfo;
-  CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("hdfs:///uri");
-  uri->set_executable(true);
-  uri->set_extract(false);
-
-  string directory = "/tmp/directory";
-  Option<string> user = "user";
-
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
   flags.frameworks_home = "/tmp/frameworks";
-  flags.hadoop_home = "/tmp/hadoop";
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, directory, user, flags);
-
-  EXPECT_EQ(2u, environment.size());
-
-  EXPECT_EQ(flags.hadoop_home, environment["HADOOP_HOME"]);
-
-  Try<JSON::Object> parse =
-    JSON::parse<JSON::Object>(environment["MESOS_FETCHER_INFO"]);
-  ASSERT_SOME(parse);
-
-  Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
-  ASSERT_SOME(fetcherInfo);
-
-  EXPECT_EQ(stringify(JSON::Protobuf(commandInfo)),
-            stringify(JSON::Protobuf(fetcherInfo.get().command_info())));
-  EXPECT_EQ(directory, fetcherInfo.get().work_directory());
-  EXPECT_EQ(user.get(), fetcherInfo.get().user());
-  EXPECT_EQ(flags.frameworks_home, fetcherInfo.get().frameworks_home());
-}
-
-
-class FetcherTest : public TemporaryDirectoryTest {};
-
 
-TEST_F(FetcherTest, FileURI)
-{
-  string fromDir = path::join(os::getcwd(), "from");
-  ASSERT_SOME(os::mkdir(fromDir));
-  string testFile = path::join(fromDir, "test");
-  EXPECT_FALSE(os::write(testFile, "data").isError());
-
-  string localFile = path::join(os::getcwd(), "test");
-  EXPECT_FALSE(os::exists(localFile));
-
-  slave::Flags flags;
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
-  uri->set_value("file://" + testFile);
-
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment);
+  uri->set_value("lala://nopath");
 
-  ASSERT_SOME(fetcherSubprocess);
-  Future<Option<int>> status = fetcherSubprocess.get().status();
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_FAILED(fetch);
 
-  EXPECT_TRUE(os::exists(localFile));
+  // See Fetcher::basename().
+  EXPECT_TRUE(strings::contains(fetch.failure(), "Malformed"));
 }
 
 
@@ -364,31 +204,27 @@ TEST_F(FetcherTest, AbsoluteFilePath)
   string fromDir = path::join(os::getcwd(), "from");
   ASSERT_SOME(os::mkdir(fromDir));
   string testPath = path::join(fromDir, "test");
-  EXPECT_FALSE(os::write(testPath, "data").isError());
+  EXPECT_SOME(os::write(testPath, "data"));
 
   string localFile = path::join(os::getcwd(), "test");
   EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(testPath);
 
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment);
-
-  ASSERT_SOME(fetcherSubprocess);
-  Future<Option<int>> status = fetcherSubprocess.get().status();
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_READY(fetch);
 
   EXPECT_TRUE(os::exists(localFile));
 }
@@ -399,55 +235,38 @@ TEST_F(FetcherTest, RelativeFilePath)
   string fromDir = path::join(os::getcwd(), "from");
   ASSERT_SOME(os::mkdir(fromDir));
   string testPath = path::join(fromDir, "test");
-  EXPECT_FALSE(os::write(testPath, "data").isError());
+  EXPECT_SOME(os::write(testPath, "data"));
 
   string localFile = path::join(os::getcwd(), "test");
   EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value("test");
 
-  // The first run must fail, because we have not set frameworks_home yet.
-
-  map<string, string> environment1 =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess1 =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment1);
-
-  ASSERT_SOME(fetcherSubprocess1);
-  Future<Option<int>> status1 = fetcherSubprocess1.get().status();
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  AWAIT_READY(status1);
-  ASSERT_SOME(status1.get());
+  // The first run must fail, because we have not set frameworks_home yet.
 
-  // mesos-fetcher always exits with EXIT(1) on failure.
-  EXPECT_EQ(1, WIFEXITED(status1.get().get()));
+  Future<Nothing> fetch1 = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_FAILED(fetch1);
 
   EXPECT_FALSE(os::exists(localFile));
 
   // The next run must succeed due to this flag.
   flags.frameworks_home = fromDir;
 
-  map<string, string> environment2 =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess2 =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment2);
-
-  ASSERT_SOME(fetcherSubprocess2);
-  Future<Option<int>> status2 = fetcherSubprocess2.get().status();
-
-  AWAIT_READY(status2);
-  ASSERT_SOME(status2.get());
-  EXPECT_EQ(0, status2.get().get());
+  Future<Nothing> fetch2 = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_READY(fetch2);
 
   EXPECT_TRUE(os::exists(localFile));
 }
@@ -481,26 +300,22 @@ TEST_F(FetcherTest, OSNetUriTest)
   EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
   flags.frameworks_home = "/tmp/frameworks";
 
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
+
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(url);
 
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment);
-
-  ASSERT_SOME(fetcherSubprocess);
-  Future<Option<int>> status = fetcherSubprocess.get().status();
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_READY(fetch);
 
   EXPECT_TRUE(os::exists(localFile));
 }
@@ -511,32 +326,27 @@ TEST_F(FetcherTest, FileLocalhostURI)
   string fromDir = path::join(os::getcwd(), "from");
   ASSERT_SOME(os::mkdir(fromDir));
   string testFile = path::join(fromDir, "test");
-  EXPECT_FALSE(os::write(testFile, "data").isError());
+  EXPECT_SOME(os::write(testFile, "data"));
 
   string localFile = path::join(os::getcwd(), "test");
   EXPECT_FALSE(os::exists(localFile));
 
   slave::Flags flags;
-  flags.frameworks_home = "/tmp/frameworks";
+  flags.launcher_dir = path::join(tests::flags.build_dir, "src");
+
+  ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
 
   CommandInfo commandInfo;
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(path::join("file://localhost", testFile));
 
-  map<string, string> environment =
-    Fetcher::environment(commandInfo, os::getcwd(), None(), flags);
-
-  Try<Subprocess> fetcherSubprocess =
-    process::subprocess(
-      path::join(mesos::internal::tests::flags.build_dir, "src/mesos-fetcher"),
-      environment);
-
-  ASSERT_SOME(fetcherSubprocess);
-  Future<Option<int>> status = fetcherSubprocess.get().status();
+  Fetcher fetcher;
+  SlaveID slaveId;
 
-  AWAIT_READY(status);
-  ASSERT_SOME(status.get());
-  EXPECT_EQ(0, status.get().get());
+  Future<Nothing> fetch = fetcher.fetch(
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
+  AWAIT_READY(fetch);
 
   EXPECT_TRUE(os::exists(localFile));
 }
@@ -561,20 +371,11 @@ TEST_F(FetcherTest, NoExtractNotExecutable)
   slave::Flags flags;
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
-  Option<int> stdout = None();
-  Option<int> stderr = None();
-
-  // Redirect mesos-fetcher output if running the tests verbosely.
-  if (tests::flags.verbose) {
-    stdout = STDOUT_FILENO;
-    stderr = STDERR_FILENO;
-  }
-
   Fetcher fetcher;
+  SlaveID slaveId;
 
   Future<Nothing> fetch = fetcher.fetch(
-      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
-
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
   AWAIT_READY(fetch);
 
   Try<string> basename = os::basename(path.get());
@@ -611,19 +412,11 @@ TEST_F(FetcherTest, NoExtractExecutable)
   slave::Flags flags;
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
-  Option<int> stdout = None();
-  Option<int> stderr = None();
-
-  // Redirect mesos-fetcher output if running the tests verbosely.
-  if (tests::flags.verbose) {
-    stdout = STDOUT_FILENO;
-    stderr = STDERR_FILENO;
-  }
-
   Fetcher fetcher;
+  SlaveID slaveId;
 
   Future<Nothing> fetch = fetcher.fetch(
-      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
 
   AWAIT_READY(fetch);
 
@@ -669,19 +462,11 @@ TEST_F(FetcherTest, ExtractNotExecutable)
   slave::Flags flags;
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
-  Option<int> stdout = None();
-  Option<int> stderr = None();
-
-  // Redirect mesos-fetcher output if running the tests verbosely.
-  if (tests::flags.verbose) {
-    stdout = STDOUT_FILENO;
-    stderr = STDERR_FILENO;
-  }
-
   Fetcher fetcher;
+  SlaveID slaveId;
 
   Future<Nothing> fetch = fetcher.fetch(
-      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
 
   AWAIT_READY(fetch);
 
@@ -768,19 +553,11 @@ TEST_F(FetcherTest, HdfsURI)
   CommandInfo::URI* uri = commandInfo.add_uris();
   uri->set_value(path::join("hdfs://localhost", testFile));
 
-  Option<int> stdout = None();
-  Option<int> stderr = None();
-
-  // Redirect mesos-fetcher output if running the tests verbosely.
-  if (tests::flags.verbose) {
-    stdout = STDOUT_FILENO;
-    stderr = STDERR_FILENO;
-  }
-
   Fetcher fetcher;
+  SlaveID slaveId;
 
   Future<Nothing> fetch = fetcher.fetch(
-      containerId, commandInfo, os::getcwd(), None(), flags, stdout, stderr);
+      containerId, commandInfo, os::getcwd(), None(), slaveId, flags);
 
   AWAIT_READY(fetch);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 1d5639c..d7a3c06 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -48,6 +48,7 @@
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 
+using std::list;
 using std::shared_ptr;
 using std::string;
 using testing::_;
@@ -140,6 +141,7 @@ slave::Flags MesosTest::CreateSlaveFlags()
   CHECK_SOME(directory) << "Failed to create temporary directory";
 
   flags.work_dir = directory.get();
+  flags.fetcher_cache_dir = path::join(directory.get(), "fetch");
 
   flags.launcher_dir = path::join(tests::flags.build_dir, "src");
 
@@ -445,6 +447,47 @@ void MockSlave::unmocked___recover(const Future<Nothing>& future)
 }
 
 
+MockFetcherProcess::MockFetcherProcess()
+{
+  // Set up default behaviors, calling the original methods.
+  EXPECT_CALL(*this, _fetch(_, _, _, _, _, _, _)).
+    WillRepeatedly(
+        Invoke(this, &MockFetcherProcess::unmocked__fetch));
+  EXPECT_CALL(*this, run(_, _, _)).
+    WillRepeatedly(Invoke(this, &MockFetcherProcess::unmocked_run));
+}
+
+
+process::Future<Nothing> MockFetcherProcess::unmocked__fetch(
+  const list<Future<shared_ptr<Cache::Entry>>> futures,
+  const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
+    entries,
+  const ContainerID& containerId,
+  const string& sandboxDirectory,
+  const string& cacheDirectory,
+  const Option<string>& user,
+  const slave::Flags& flags)
+{
+  return slave::FetcherProcess::_fetch(
+      futures,
+      entries,
+      containerId,
+      sandboxDirectory,
+      cacheDirectory,
+      user,
+      flags);
+}
+
+
+process::Future<Nothing> MockFetcherProcess::unmocked_run(
+    const ContainerID& containerId,
+    const FetcherInfo& info,
+    const slave::Flags& flags)
+{
+  return slave::FetcherProcess::run(containerId, info, flags);
+}
+
+
 slave::Flags ContainerizerTest<slave::MesosContainerizer>::CreateSlaveFlags()
 {
   slave::Flags flags = MesosTest::CreateSlaveFlags();

http://git-wip-us.apache.org/repos/asf/mesos/blob/edd35b05/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index ac986a0..a1c6ae4 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -30,6 +30,8 @@
 
 #include <mesos/master/allocator.hpp>
 
+#include <mesos/fetcher/fetcher.hpp>
+
 #include <mesos/slave/resource_estimator.hpp>
 
 #include <process/future.hpp>
@@ -44,6 +46,7 @@
 #include <stout/foreach.hpp>
 #include <stout/gtest.hpp>
 #include <stout/lambda.hpp>
+#include <stout/memory.hpp>
 #include <stout/none.hpp>
 #include <stout/option.hpp>
 #include <stout/stringify.hpp>
@@ -799,6 +802,52 @@ private:
 };
 
 
+// Definition of a mock FetcherProcess to be used in tests with gmock.
+class MockFetcherProcess : public slave::FetcherProcess
+{
+public:
+  MockFetcherProcess();
+
+  virtual ~MockFetcherProcess() {}
+
+  MOCK_METHOD7(_fetch, process::Future<Nothing>(
+      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
+        futures,
+      const hashmap<
+          CommandInfo::URI,
+          Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
+        entries,
+      const ContainerID& containerId,
+      const std::string& sandboxDirectory,
+      const std::string& cacheDirectory,
+      const Option<std::string>& user,
+      const slave::Flags& flags));
+
+  process::Future<Nothing> unmocked__fetch(
+      const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
+        futures,
+      const hashmap<
+          CommandInfo::URI,
+          Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
+        entries,
+      const ContainerID& containerId,
+      const std::string& sandboxDirectory,
+      const std::string& cacheDirectory,
+      const Option<std::string>& user,
+      const slave::Flags& flags);
+
+  MOCK_METHOD3(run, process::Future<Nothing>(
+      const ContainerID& containerId,
+      const FetcherInfo& info,
+      const slave::Flags& flags));
+
+  process::Future<Nothing> unmocked_run(
+      const ContainerID& containerId,
+      const FetcherInfo& info,
+      const slave::Flags& flags);
+};
+
+
 // Definition of a MockAuthozier that can be used in tests with gmock.
 class MockAuthorizer : public Authorizer
 {


Mime
View raw message