singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wang...@apache.org
Subject [2/4] incubator-singa git commit: SINGA-11 Start SINGA on Apache Mesos
Date Wed, 21 Oct 2015 05:30:07 GMT
SINGA-11 Start SINGA on Apache Mesos

Refined Makefile to use relative path to SINGA header files.

Removed singa.conf file from the directory. The file can be found on SINGA's top directory.

Refined singa_scheduler.cc to following the project's coding style.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/96c440ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/96c440ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/96c440ff

Branch: refs/heads/master
Commit: 96c440ffc0106a8be6b93e86a864854abe7b7d58
Parents: f0e1629
Author: Anh Dinh <ug93tad@gmail.com>
Authored: Tue Oct 20 11:08:05 2015 +0800
Committer: Anh Dinh <ug93tad@gmail.com>
Committed: Tue Oct 20 11:41:40 2015 +0800

----------------------------------------------------------------------
 tool/mesos/Makefile           |   2 +-
 tool/mesos/singa.conf         |   7 -
 tool/mesos/singa_scheduler.cc | 717 ++++++++++++++++++-------------------
 3 files changed, 351 insertions(+), 375 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/96c440ff/tool/mesos/Makefile
----------------------------------------------------------------------
diff --git a/tool/mesos/Makefile b/tool/mesos/Makefile
index 09b250b..d64fb5b 100644
--- a/tool/mesos/Makefile
+++ b/tool/mesos/Makefile
@@ -1,4 +1,4 @@
-CXX_FLAGS=-I ../include -std=c++11 -I /usr/local/include/hdfs -I /root/incubator-singa/include
+CXX_FLAGS=-I ../include -std=c++11 -I /usr/local/include/hdfs -I ../../include
 LD_FLAGS=-lmesos -lsinga -lhdfs3
 EXE=scheduler
 OBJS=singa_scheduler.o scheduler.pb.o

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/96c440ff/tool/mesos/singa.conf
----------------------------------------------------------------------
diff --git a/tool/mesos/singa.conf b/tool/mesos/singa.conf
deleted file mode 100644
index f49e689..0000000
--- a/tool/mesos/singa.conf
+++ /dev/null
@@ -1,7 +0,0 @@
-# point to your active zookeeper service
-# this is comma separated host:port pairs, each corresponding to a zk server
-# e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zookeeper_host: "node0:2181"
-
-# set if you want to change log directory
-log_dir: "/tmp/singa-log/"

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/96c440ff/tool/mesos/singa_scheduler.cc
----------------------------------------------------------------------
diff --git a/tool/mesos/singa_scheduler.cc b/tool/mesos/singa_scheduler.cc
index 86a4ded..2474872 100644
--- a/tool/mesos/singa_scheduler.cc
+++ b/tool/mesos/singa_scheduler.cc
@@ -18,21 +18,19 @@
 * under the License.
 *
 *************************************************************/
-
+#include "singa/proto/job.pb.h"
+#include "singa/proto/singa.pb.h"
+#include "./scheduler.pb.h"
+#include "singa/utils/common.h"
 #include <stdio.h>
-#include <glog/logging.h>
 #include <stdlib.h>
-#include <mesos/scheduler.hpp>
 #include <string>
 #include <iostream>
 #include <fstream>
 #include <hdfs.h>
-#include "singa/proto/job.pb.h"
-#include "singa/proto/singa.pb.h"
-#include "scheduler.pb.h"
-#include "singa/utils/common.h"
+#include <mesos/scheduler.hpp>
 #include <google/protobuf/text_format.h>
-
+#include <glog/logging.h>
 /**
  * \file singa_scheduler.cc implements a framework for managing SINGA jobs.
  *  
@@ -71,369 +69,354 @@ using mesos::SchedulerDriver;
 using std::vector;
 using std::map;
 
-string usage = "	singa_scheduler <job_conf> [-scheduler_conf global_config] [-singa_conf
singa_config] \n"
-							 "	  job_conf: job configuration file\n"
-							 "    -scheduler_conf: optional, system-wide configuration file\n"
-							 "    -singa_conf: optional, singa global configuration file\n"; 
-
-string SINGA_CONFIG = "/singa/singa.conf"; 
-string DEFAULT_SCHEDULER_CONF = "scheduler.conf";
+const char usage[] = " singa_scheduler <job_conf> [-scheduler_conf global_config] [-singa_conf
singa_config] \n"
+                      " job_conf: job configuration file\n"
+                      " -scheduler_conf: optional, system-wide configuration file\n"
+                      " -singa_conf: optional, singa global configuration file\n";
 
+const char SINGA_CONFIG[] = "/singa/singa.conf";
+const char DEFAULT_SCHEDULER_CONF[] = "scheduler.conf";
 class SingaScheduler: public mesos::Scheduler {
-	public:
-
-		/**
-		 * Constructor, used when [sing_conf] is not given. Raise error if /singa/singa.conf is
not found
-		 * on HDFS. 
-		 *
-		 * @param namenode	address of HDFS namenode
-		 * @param job_conf_file	job configuration file
-		 * @param jc		job counter
-		 */
-		SingaScheduler(string namenode, string job_conf_file, int jc): 
-			job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false), job_counter_(jc){
-
-				hdfs_handle_ = hdfs_connect(namenode.c_str()); 
-				if (hdfs_handle_){
-					if (hdfsExists(hdfs_handle_,SINGA_CONFIG.c_str())!=0)
-						LOG(ERROR) << SINGA_CONFIG << " is not found on HDFS. Please use -singa_conf
flag to upload the file"; 
-				}
-				else
-					LOG(ERROR) << "Failed to connect to HDFS"; 
-
-				ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_); 	
-				
-			}
-
-		/**
-		 * Constructor. It overwrites /singa/singa.conf on HDFS (created a new one if necessary).
 
-		 * The file contains zookeeper_host and log_dir values
-		 * It also parses the JobProto from job_config file
-		 *
-		 * @param namenode	address of HDFS namenode
-		 * @param singa_conf	singa global configuration file
-		 * @param job_conf_file	job configuration file
-		 */
-		SingaScheduler(string namenode, string job_conf_file, string singa_conf, int jc)
-			: job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false),
job_counter_(jc){
-
-				hdfs_handle_ = hdfs_connect(namenode); 
-				if (!hdfs_handle_ || !hdfs_overwrite(hdfs_handle_, SINGA_CONFIG, singa_conf))
-					LOG(ERROR) << "Failed to connect to HDFS"; 
-			
-				ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_); 
-			}
-		virtual void registered(SchedulerDriver *driver,
-				const mesos::FrameworkID& frameworkId,
-				const mesos::MasterInfo& masterInfo) {
-		}
-
-		virtual void reregistered(SchedulerDriver *driver,
-				const mesos::MasterInfo& masterInfo) {
-		}
-
-		virtual void disconnected(SchedulerDriver *driver) {
-		}
-
-		/**
-		 * Handle resource offering from Mesos scheduler. It implements the simple/naive
-		 * scheduler:
-		 * + For each offer that contains enough CPUs, adds new tasks to the list
-		 * + Launch all the tasks when reaching the required number of tasks (nworkers_groups +
nserver_groups). 
-		 */
-		virtual void resourceOffers(SchedulerDriver* driver,	const 
-				std::vector<mesos::Offer>& offers) {
-
-			// do nothing if the task is already running
-			if (is_running_)
-				return; 
-
-			for (int i = 0; i < offers.size(); i++) {
-				const mesos::Offer offer = offers[i];
-
-				// check for resource and create temporary tasks 
-				int cpus = 0, mem = 0;
-				int nresources = offer.resources().size();
-
-				for (int r = 0; r < nresources; r++) {
-					const mesos::Resource& resource = offer.resources(r);
-					if (resource.name() == "cpus"
-							&& resource.type() == mesos::Value::SCALAR)
-						cpus = resource.scalar().value();
-					else if (resource.name() == "mem"
-							&& resource.type() == mesos::Value::SCALAR)
-						mem = resource.scalar().value();
-				}
-
-				if (!check_resources(cpus)) 
-					break; 
-
-				vector<mesos::TaskInfo> *new_tasks = new vector<mesos::TaskInfo>();
-				mesos::TaskInfo task;
-				task.set_name("SINGA");
-
-				char string_id[100];
-				sprintf(string_id, "SINGA_%d", nhosts_); 
-				task.mutable_task_id()->set_value(string_id);
-				task.mutable_slave_id()->MergeFrom(offer.slave_id());
-
-				mesos::Resource *resource = task.add_resources();
-				resource->set_name("cpus");
-				resource->set_type(mesos::Value::SCALAR);
-				// take only nworkers_per_group CPUs
-				resource->mutable_scalar()->set_value(job_conf_.cluster().nworkers_per_group());
-
-				resource = task.add_resources();
-				resource->set_name("mem");
-				resource->set_type(mesos::Value::SCALAR);
-				// take all the memory
-				resource->mutable_scalar()->set_value(mem);
-
-				// store in temporary map
-				new_tasks->push_back(task);
-				tasks_[offer.id().value()] = new_tasks;
-
-				nhosts_++;
-			}
-
-			if (nhosts_>= job_conf_.cluster().nworker_groups()){
-				LOG(INFO) << "Acquired enough resources: " 
-					<< job_conf_.cluster().nworker_groups()*job_conf_.cluster().nworkers_per_group()
-					<< " CPUs over " << job_conf_.cluster().nworker_groups() << " hosts.
Launching tasks ... "; 
-
-				//write job_conf_file_ to /singa/job_id/job.conf
-				char path[512];
-				sprintf(path, "/singa/%d/job.conf",job_counter_); 
-				hdfs_overwrite(hdfs_handle_, path, job_conf_file_); 
-
-				int job_count=0; 
-				// launch tasks
-				for (map<string, vector<mesos::TaskInfo>*>::iterator it =
-						tasks_.begin(); it != tasks_.end(); ++it) {
-					prepare_tasks(it->second, job_counter_, path); 	
-					mesos::OfferID newId;
-					newId.set_value(it->first);
-					LOG(INFO) << "Launching task with offer ID = " << newId.value(); 
-					driver->launchTasks(newId, *(it->second));
-					job_count++;
-					if (job_count>= job_conf_.cluster().nworker_groups())
-						break;
-				}
-
-				job_counter_++; 
-				is_running_ = true; 
-			}
-		}
-
-		virtual void offerRescinded(SchedulerDriver *driver,
-				const mesos::OfferID& offerId) {
-		}
-
-		virtual void statusUpdate(SchedulerDriver* driver,
-				const mesos::TaskStatus& status) {
-			if (status.state() == mesos::TASK_FINISHED)
-				driver->stop();
-			else if (status.state() == mesos::TASK_FAILED){
-				LOG(ERROR) << "TASK FAILED !!!!"; 
-				driver->abort();
-			}
-		}
-
-		virtual void frameworkMessage(SchedulerDriver* driver,
-				const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId,
-				const string& data) {
-		}
-
-		virtual void slaveLost(SchedulerDriver* driver,
-				const mesos::SlaveID& slaveId) {
-		}
-
-		virtual void executorLost(SchedulerDriver* driver,
-				const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId,
-				int status) {
-		}
-
-		virtual void error(SchedulerDriver* driver, const string& message) {
-			LOG(ERROR) << "ERROR !!! " << message;
-		}
-
-	private:
-
-		/**
-		 * Helper function that initialize TaskInfo with the correct URI and command
-		 */
-		void prepare_tasks(vector<mesos::TaskInfo> *tasks, int job_id, string job_conf){
-			char path_sys_config[512], path_job_config[512];
-			//path to singa.conf
-			sprintf(path_sys_config, "hdfs://%s%s", namenode_.c_str(), SINGA_CONFIG.c_str()); 
-			sprintf(path_job_config, "hdfs://%s%s", namenode_.c_str(), job_conf.c_str()); 
-
-			char command[512];
-			sprintf(command, "singa -conf ./job.conf -singa_conf ./singa.conf -singa_job %d", job_id);

-
-			for (int i=0; i<tasks->size(); i++){
-				mesos::CommandInfo *comm = (tasks->at(i)).mutable_command();
-				comm->add_uris()->set_value(path_sys_config); 
-				comm->add_uris()->set_value(path_job_config); 
-				comm->set_value(command); 
-			}
-		}
-
-		/**
-		 * Helper function to connect to HDFS
-		 */
-		hdfsFS hdfs_connect(string namenode){
-			string path(namenode);
-			int idx = path.find_first_of(":"); 
-			string host=path.substr(0,idx);
-			int port = atoi(path.substr(idx+1).c_str()); 
-			return hdfsConnect(host.c_str(), port); 
-		}
-
-		/**
-		 * Helper function to read HDFS file content into a string. 
-		 * It assumes the file exists. 
-		 * @return NULL if there's error. 
-		 */
-		string hdfs_read(hdfsFS hdfs_handle, string filename){
-			hdfsFileInfo* stat = hdfsGetPathInfo(hdfs_handle, filename.c_str()); 
-			char buffer[stat->mSize]; 
-			hdfsFile file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_RDONLY, 0, 0, 0); 
-			int status = hdfsRead(hdfs_handle, file, buffer, stat->mSize); 
-			hdfsFreeFileInfo(stat,1); 
-			hdfsCloseFile(hdfs_handle, file); 
-			if (status!=-1)
-				return string(buffer); 
-			else
-				return NULL; 
-		}
-
-		/**
-		 * Helper function that write content of source_file to filename, overwritting the latter

-		 * if it exists. 
-		 * @return 1 if sucessfull, 0 if fail. 
-		 */
-		int hdfs_overwrite(hdfsFS hdfs_handle, string filename, string source_file){
-			hdfsFile file; 
-			if (hdfsExists(hdfs_handle, filename.c_str())==0){
-				file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0); 
-			}
-			else{
-				//create directory and file
-				int last_idx = filename.find_last_of("/"); 
-				string dir = filename.substr(0,last_idx); 
-				hdfsCreateDirectory(hdfs_handle, dir.c_str()); 
-				file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0); 
-			}
-
-			FILE *fh = fopen(source_file.c_str(), "r"); 
-			if (!fh){
-				LOG(ERROR) << "Cannot open " << source_file; 
-				return 0; 
-			}
-
-			if (file){
-				fseek(fh,0,SEEK_END); 
-				int len = ftell(fh); 
-				rewind(fh); 
-				char buf[len];
-				fread(buf,len,1,fh);
-				fclose(fh);
-
-				hdfsWrite(hdfs_handle, file, buf, len); 
-				hdfsFlush(hdfs_handle, file); 
-				hdfsCloseFile(hdfs_handle, file); 
-			}
-			else{
-				LOG(ERROR) << "ERROR openng file on HDFS " << filename; 
-				return 0; 
-			}
-
-			return 1; 
-		}
-
-		/**
-		 * Helper function, check if the offered CPUs satisfies the resource requirements
-		 * @param ncpus:	number of cpus offer at this host
-		 * @return true		when ncpus >= (nWorkersPerProcess + nServersPerProcess) if workers
and servers are separated
-		 *								or when cpus >= max(nWorkersPerProcess, nServersPerProcess) if they are not.

-		 */
-		bool check_resources(int ncpus){
-			int n1 = job_conf_.cluster().nworkers_per_procs();
-			int n2 = job_conf_.cluster().nservers_per_procs();
-			LOG(INFO) << "n1 = " << n1 << " n2 = " << n2 << " ncpus
= " << ncpus; 
-			return job_conf_.cluster().server_worker_separate()? ncpus >= (n1+n2) 
-															: ncpus >= (n1>n2 ? n1 : n2); 
-		}
-
-		int job_counter_; 
-
-		// true if the job has been launched
-		bool is_running_; 
-		singa::JobProto job_conf_; 	
-		// total number of hosts required
-		int nhosts_;
-		// temporary map of tasks: <offerID, TaskInfo> 
-		map<string, vector<mesos::TaskInfo>*> tasks_;
-		// SINGA job config file
-		string job_conf_file_; 
-		// HDFS namenode
-		string namenode_; 
-		// handle to HDFS
-		hdfsFS hdfs_handle_;
+ public:
+    /**
+     * Constructor, used when [sing_conf] is not given. Raise error if /singa/singa.conf
is not found
+     * on HDFS. 
+     *
+     * @param namenode	address of HDFS namenode
+     * @param job_conf_file	job configuration file
+     * @param jc		job counter
+     */
+    SingaScheduler(string namenode, string job_conf_file, int jc):
+      job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false),
job_counter_(jc) {
+        hdfs_handle_ = hdfs_connect(namenode.c_str());
+        if (hdfs_handle_) {
+          if (hdfsExists(hdfs_handle_, SINGA_CONFIG) != 0)
+            LOG(ERROR) << SINGA_CONFIG << " is not found on HDFS. Please use
-singa_conf flag to upload the file";
+        } else {
+          LOG(ERROR) << "Failed to connect to HDFS";
+        }
+        ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_);
+    }
+    /**
+     * Constructor. It overwrites /singa/singa.conf on HDFS (created a new one if necessary).
 
+     * The file contains zookeeper_host and log_dir values
+     * It also parses the JobProto from job_config file
+     *
+     * @param namenode	address of HDFS namenode
+     * @param singa_conf	singa global configuration file
+     * @param job_conf_file	job configuration file
+     */
+    SingaScheduler(string namenode, string job_conf_file, string singa_conf, int jc)
+      : job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false),
job_counter_(jc) {
+        hdfs_handle_ = hdfs_connect(namenode);
+        if (!hdfs_handle_ || !hdfs_overwrite(hdfs_handle_, SINGA_CONFIG, singa_conf))
+          LOG(ERROR) << "Failed to connect to HDFS";
+
+        ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_);
+      }
+    virtual void registered(SchedulerDriver *driver,
+        const mesos::FrameworkID& frameworkId,
+        const mesos::MasterInfo& masterInfo) {
+    }
+
+    virtual void reregistered(SchedulerDriver *driver,
+        const mesos::MasterInfo& masterInfo) {
+    }
+
+    virtual void disconnected(SchedulerDriver *driver) {
+    }
+
+    /**
+     * Handle resource offering from Mesos scheduler. It implements the simple/naive
+     * scheduler:
+     * + For each offer that contains enough CPUs, adds new tasks to the list
+     * + Launch all the tasks when reaching the required number of tasks (nworkers_groups
+ nserver_groups). 
+     */
+    virtual void resourceOffers(SchedulerDriver* driver, const std::vector<mesos::Offer>&
offers) {
+      // do nothing if the task is already running
+      if (is_running_)
+        return;
+
+      for (int i = 0; i < offers.size(); i++) {
+        const mesos::Offer offer = offers[i];
+        // check for resource and create temporary tasks
+        int cpus = 0, mem = 0;
+        int nresources = offer.resources().size();
+
+        for (int r = 0; r < nresources; r++) {
+          const mesos::Resource& resource = offer.resources(r);
+          if (resource.name() == "cpus"
+              && resource.type() == mesos::Value::SCALAR)
+            cpus = resource.scalar().value();
+          else if (resource.name() == "mem"
+              && resource.type() == mesos::Value::SCALAR)
+            mem = resource.scalar().value();
+        }
+
+        if (!check_resources(cpus))
+          break;
+
+        vector<mesos::TaskInfo> *new_tasks = new vector<mesos::TaskInfo>();
+        mesos::TaskInfo task;
+        task.set_name("SINGA");
+
+        char string_id[100];
+        snprintf(string_id, 100, "SINGA_%d", nhosts_);
+        task.mutable_task_id()->set_value(string_id);
+        task.mutable_slave_id()->MergeFrom(offer.slave_id());
+
+        mesos::Resource *resource = task.add_resources();
+        resource->set_name("cpus");
+        resource->set_type(mesos::Value::SCALAR);
+        // take only nworkers_per_group CPUs
+        resource->mutable_scalar()->set_value(job_conf_.cluster().nworkers_per_group());
+
+        resource = task.add_resources();
+        resource->set_name("mem");
+        resource->set_type(mesos::Value::SCALAR);
+        // take all the memory
+        resource->mutable_scalar()->set_value(mem);
+
+        // store in temporary map
+        new_tasks->push_back(task);
+        tasks_[offer.id().value()] = new_tasks;
+
+        nhosts_++;
+      }
+
+      if (nhosts_>= job_conf_.cluster().nworker_groups()) {
+        LOG(INFO) << "Acquired enough resources: "
+          << job_conf_.cluster().nworker_groups()*job_conf_.cluster().nworkers_per_group()
+          << " CPUs over " << job_conf_.cluster().nworker_groups() << "
hosts. Launching tasks ... ";
+
+        // write job_conf_file_ to /singa/job_id/job.conf
+        char path[512];
+        snprintf(path, 512, "/singa/%d/job.conf", job_counter_);
+        hdfs_overwrite(hdfs_handle_, path, job_conf_file_);
+
+        int job_count = 0;
+        // launch tasks
+        for (map<string, vector<mesos::TaskInfo>*>::iterator it =
+            tasks_.begin(); it != tasks_.end(); ++it) {
+          prepare_tasks(it->second, job_counter_, path);
+          mesos::OfferID newId;
+          newId.set_value(it->first);
+          LOG(INFO) << "Launching task with offer ID = " << newId.value();
+          driver->launchTasks(newId, *(it->second));
+          job_count++;
+          if (job_count>= job_conf_.cluster().nworker_groups())
+            break;
+        }
+
+        job_counter_++;
+        is_running_ = true;
+      }
+    }
+
+    virtual void offerRescinded(SchedulerDriver *driver,
+        const mesos::OfferID& offerId) {
+    }
+
+    virtual void statusUpdate(SchedulerDriver* driver,
+        const mesos::TaskStatus& status) {
+      if (status.state() == mesos::TASK_FINISHED)
+        driver->stop();
+      else if (status.state() == mesos::TASK_FAILED) {
+        LOG(ERROR) << "TASK FAILED !!!!";
+        driver->abort();
+      }
+    }
+
+    virtual void frameworkMessage(SchedulerDriver* driver,
+        const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId,
+        const string& data) {
+    }
+
+    virtual void slaveLost(SchedulerDriver* driver,
+        const mesos::SlaveID& slaveId) {
+    }
+
+    virtual void executorLost(SchedulerDriver* driver,
+        const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId,
+        int status) {
+    }
+
+    virtual void error(SchedulerDriver* driver, const string& message) {
+      LOG(ERROR) << "ERROR !!! " << message;
+    }
+
+ private:
+    /**
+     * Helper function that initialize TaskInfo with the correct URI and command
+     */
+    void prepare_tasks(vector<mesos::TaskInfo> *tasks, int job_id, string job_conf)
{
+      char path_sys_config[512], path_job_config[512];
+      // path to singa.conf
+      snprintf(path_sys_config, 512, "hdfs://%s%s", namenode_.c_str(), SINGA_CONFIG);
+      snprintf(path_job_config, 512, "hdfs://%s%s", namenode_.c_str(), job_conf.c_str());
+
+      char command[512];
+      snprintf(command, 512, "singa -conf ./job.conf -singa_conf ./singa.conf -singa_job
%d", job_id);
+
+      for (int i=0; i < tasks->size(); i++) {
+        mesos::CommandInfo *comm = (tasks->at(i)).mutable_command();
+        comm->add_uris()->set_value(path_sys_config);
+        comm->add_uris()->set_value(path_job_config);
+        comm->set_value(command);
+      }
+    }
+
+    /**
+     * Helper function to connect to HDFS
+     */
+    hdfsFS hdfs_connect(string namenode) {
+      string path(namenode);
+      int idx = path.find_first_of(":");
+      string host = path.substr(0, idx);
+      int port = atoi(path.substr(idx+1).c_str());
+      return hdfsConnect(host.c_str(), port);
+    }
+
+    /**
+     * Helper function to read HDFS file content into a string. 
+     * It assumes the file exists. 
+     * @return NULL if there's error. 
+     */
+    string hdfs_read(hdfsFS hdfs_handle, string filename) {
+      hdfsFileInfo* stat = hdfsGetPathInfo(hdfs_handle, filename.c_str());
+      int file_size = stat->mSize;
+      string buffer;
+      buffer.resize(file_size);
+
+      hdfsFile file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_RDONLY, 0, 0, 0);
+      int status = hdfsRead(hdfs_handle, file, const_cast<char*>(buffer.c_str()), stat->mSize);
+      hdfsFreeFileInfo(stat, 1);
+      hdfsCloseFile(hdfs_handle, file);
+      if (status != -1)
+        return string(buffer);
+      else
+        return NULL;
+    }
+
+    /**
+     * Helper function that write content of source_file to filename, overwritting the latter

+     * if it exists. 
+     * @return 1 if sucessfull, 0 if fail. 
+     */
+    int hdfs_overwrite(hdfsFS hdfs_handle, string filename, string source_file) {
+      hdfsFile file;
+      if (hdfsExists(hdfs_handle, filename.c_str()) == 0) {
+        file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0);
+      } else {
+        // create directory and file
+        int last_idx = filename.find_last_of("/");
+        string dir = filename.substr(0, last_idx);
+        hdfsCreateDirectory(hdfs_handle, dir.c_str());
+        file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0);
+      }
+
+      FILE *fh = fopen(source_file.c_str(), "r");
+      if (!fh) {
+        LOG(ERROR) << "Cannot open " << source_file;
+        return 0;
+      }
+
+      if (file) {
+        fseek(fh, 0, SEEK_END);
+        int len = ftell(fh);
+        rewind(fh);
+        string buf;
+        buf.resize(len);
+        fread(const_cast<char*>(buf.c_str()), len, 1, fh);
+        fclose(fh);
+
+        hdfsWrite(hdfs_handle, file, buf.c_str(), len);
+        hdfsFlush(hdfs_handle, file);
+        hdfsCloseFile(hdfs_handle, file);
+      } else {
+        LOG(ERROR) << "ERROR openng file on HDFS " << filename;
+        return 0;
+      }
+
+      return 1;
+    }
+
+    /**
+     * Helper function, check if the offered CPUs satisfies the resource requirements
+     * @param ncpus:	number of cpus offer at this host
+     * @return true		when ncpus >= (nWorkersPerProcess + nServersPerProcess) if workers
and servers are separated
+     *								or when cpus >= max(nWorkersPerProcess, nServersPerProcess) if they are
not. 
+     */
+    bool check_resources(int ncpus) {
+      int n1 = job_conf_.cluster().nworkers_per_procs();
+      int n2 = job_conf_.cluster().nservers_per_procs();
+      LOG(INFO) << "n1 = " << n1 << " n2 = " << n2 << " ncpus
= " << ncpus;
+      return job_conf_.cluster().server_worker_separate()? ncpus >= (n1+n2) : ncpus >=
(n1 > n2 ? n1 : n2);
+    }
+
+    int job_counter_;
+
+    // true if the job has been launched
+    bool is_running_;
+    singa::JobProto job_conf_;
+    // total number of hosts required
+    int nhosts_;
+    // temporary map of tasks: <offerID, TaskInfo>
+    map<string, vector<mesos::TaskInfo>*> tasks_;
+    // SINGA job config file
+    string job_conf_file_;
+    // HDFS namenode
+    string namenode_;
+    // handle to HDFS
+    hdfsFS hdfs_handle_;
 };
 
 int main(int argc, char** argv) {
-	FLAGS_logtostderr = 1;
-
-	int status = mesos::DRIVER_RUNNING;
-	SingaScheduler *scheduler; 
-
-	if (!(argc==2 || argc==4 || argc==6)){
-		std::cout << usage << std::endl; 
-		return 1; 
-	}
-
-	int scheduler_conf_idx=0;
-	int singa_conf_idx=0; 
-	for (int i=1; i<argc-1; i++){
-		if (strcmp(argv[i],"-scheduler_conf")==0)
-			scheduler_conf_idx=i+1;
-		if (strcmp(argv[i],"-singa_conf")==0)
-			singa_conf_idx=i+1; 
-	}
-
-	SchedulerProto msg; 
-	
-	if (scheduler_conf_idx)
-		singa::ReadProtoFromTextFile((const char*)argv[scheduler_conf_idx], &msg); 	
-	else
-		singa::ReadProtoFromTextFile(DEFAULT_SCHEDULER_CONF.c_str(), &msg); 	
-
-	if (!singa_conf_idx)
-		scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), msg.job_counter());
-	else
-		scheduler = new SingaScheduler(msg.namenode(),string(argv[1]), string(argv[singa_conf_idx]),
msg.job_counter()); 	
-
-	msg.set_job_counter(msg.job_counter()+1); 
-	if (scheduler_conf_idx)
-		singa::WriteProtoToTextFile(msg, (const char*)argv[scheduler_conf_idx]); 	
-	else
-		singa::WriteProtoToTextFile(msg, DEFAULT_SCHEDULER_CONF.c_str()); 	
-
-	LOG(INFO) << "Scheduler initialized"; 
-	mesos::FrameworkInfo framework;
-	framework.set_user("");
-	framework.set_name("SINGA");
-
-	SchedulerDriver *driver = new mesos::MesosSchedulerDriver(scheduler, framework,
-			msg.master().c_str());
-	LOG(INFO) << "Starting SINGA framework...";
-	status = driver->run();
-	driver->stop();
-	LOG(INFO) << "Stoping SINGA framework..."; 
-	
-		
-	return status == mesos::DRIVER_STOPPED ? 0 : 1;
+  FLAGS_logtostderr = 1;
+  int status = mesos::DRIVER_RUNNING;
+  SingaScheduler *scheduler;
+  if (!(argc == 2 || argc == 4 || argc == 6)) {
+    std::cout << usage << std::endl;
+    return 1;
+  }
+
+  int scheduler_conf_idx = 0;
+  int singa_conf_idx = 0;
+  for (int i=1; i < argc-1; i++) {
+    if (strcmp(argv[i], "-scheduler_conf") == 0)
+      scheduler_conf_idx = i+1;
+    if (strcmp(argv[i], "-singa_conf") == 0)
+      singa_conf_idx = i+1;
+  }
+
+  SchedulerProto msg;
+  if (scheduler_conf_idx)
+    singa::ReadProtoFromTextFile((const char*)argv[scheduler_conf_idx], &msg);
+  else
+    singa::ReadProtoFromTextFile(DEFAULT_SCHEDULER_CONF, &msg);
+
+  if (!singa_conf_idx)
+    scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), msg.job_counter());
+  else
+    scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), string(argv[singa_conf_idx]),
msg.job_counter());
+
+  msg.set_job_counter(msg.job_counter()+1);
+  if (scheduler_conf_idx)
+    singa::WriteProtoToTextFile(msg, (const char*)argv[scheduler_conf_idx]);
+  else
+    singa::WriteProtoToTextFile(msg, DEFAULT_SCHEDULER_CONF);
+
+  LOG(INFO) << "Scheduler initialized";
+  mesos::FrameworkInfo framework;
+  framework.set_user("");
+  framework.set_name("SINGA");
+
+  SchedulerDriver *driver = new mesos::MesosSchedulerDriver(scheduler, framework, msg.master().c_str());
+  LOG(INFO) << "Starting SINGA framework...";
+  status = driver->run();
+  driver->stop();
+  LOG(INFO) << "Stoping SINGA framework...";
+
+  return status == mesos::DRIVER_STOPPED ? 0 : 1;
 }
 


Mime
View raw message