singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhon...@apache.org
Subject [1/6] incubator-singa git commit: preparing for v1.1-rc1: remove outdated files; fix links
Date Tue, 24 Jan 2017 09:05:44 GMT
Repository: incubator-singa
Updated Branches:
  refs/heads/master d190fa89a -> 2a1b988c6


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2e21c62/tool/mesos/singa_scheduler.cc
----------------------------------------------------------------------
diff --git a/tool/mesos/singa_scheduler.cc b/tool/mesos/singa_scheduler.cc
deleted file mode 100644
index dc48ae2..0000000
--- a/tool/mesos/singa_scheduler.cc
+++ /dev/null
@@ -1,426 +0,0 @@
-/************************************************************
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-* 
-*   http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*************************************************************/
-#include "singa/proto/job.pb.h"
-#include "singa/proto/singa.pb.h"
-#include "./scheduler.pb.h"
-#include "singa/utils/common.h"
-#include <stdio.h>
-#include <stdlib.h>
-#include <string>
-#include <iostream>
-#include <fstream>
-#include <hdfs.h>
-#include <mesos/scheduler.hpp>
-#include <google/protobuf/text_format.h>
-#include <glog/logging.h>
-/**
- * \file singa_scheduler.cc implements a framework for managing SINGA jobs.
- *  
- * The scheduler takes a job configuration file [file] and performs the following:
- *	1. Parse the config file to determine the required resources. 
- *	2. [Optional] Copy singa.conf to the HDFS: /singa/singa.conf.
- *			2.1. Raise error if singa.conf is NOT FOUND on HDFS
- *	3. Wait for offers from the Mesos master until enough is acquired. 
- *	4. Keep an increasing counter of job ID.  
- *	5. Write [file] to HDFS: /singa/job_ID/job.conf
- *	6. Start the task:
- *			+ Set URI in the TaskInfo message to point to the config files on HDFS:
-							/singa/singa.conf
-							/singa/Job_ID/job.conf
- *			+ Set the executable command: singa-run.sh -conf ./job.conf
- *
- *	We assume that singa-run.sh is include in $PATH variable at all nodes. 
- *	(Else, we set the executable to its full path)
- *	./job.conf is relative path pointing to the current sandbox directory created
- *	dynamically by Mesos. 
- *
- *
- * Scheduling: 
- *	Each SINGA job requires certain resources represented by: (1) number of workers, (2) number
of worker groups
- *	and (3) number of worker per process. The resources offered by Mesos contains (1) number
of host, (2) number of CPUs
- *	at each host and (3) memory available at each hosts. 
- *
- *	Our scheduler performs simply task assignment which guarantees that each process runs
an entire work group,
- *	and each takes all the memory offered by the slave. We assume that each slave runs ONE
process, that is the following
- *	condition holds:
- *		nCPUs_per_host >= nWorkersPerProcess+nServersPerProcess //if seperate
-										 >= max(nWorkerPerProcess, nServersPerProcess) // if attached
- */
-using std::string;
-using mesos::SchedulerDriver;
-using std::vector;
-using std::map;
-
-const char usage[] = " singa_scheduler <job_conf> [-scheduler_conf global_config] [-singa_conf
singa_config] \n"
-                      " job_conf: job configuration file\n"
-                      " -scheduler_conf: optional, system-wide configuration file\n"
-                      " -singa_conf: optional, singa global configuration file\n";
-
-const char SINGA_CONFIG[] = "/singa/singa.conf";
-const char DEFAULT_SCHEDULER_CONF[] = "scheduler.conf";
-class SingaScheduler: public mesos::Scheduler {
- public:
-    /**
-     * Constructor, used when [sing_conf] is not given. Raise error if /singa/singa.conf
is not found
-     * on HDFS. 
-     *
-     * @param namenode	address of HDFS namenode
-     * @param job_conf_file	job configuration file
-     * @param jc		job counter
-     */
-    SingaScheduler(string namenode, string job_conf_file, int jc):
-      job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false),
job_counter_(jc), task_counter_(0) {
-        hdfs_handle_ = hdfs_connect(namenode.c_str());
-        if (hdfs_handle_) {
-          if (hdfsExists(hdfs_handle_, SINGA_CONFIG) != 0)
-            LOG(ERROR) << SINGA_CONFIG << " is not found on HDFS. Please use
-singa_conf flag to upload the file";
-        } else {
-          LOG(ERROR) << "Failed to connect to HDFS";
-        }
-        ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_);
-    }
-    /**
-     * Constructor. It overwrites /singa/singa.conf on HDFS (created a new one if necessary).
 
-     * The file contains zookeeper_host and log_dir values
-     * It also parses the JobProto from job_config file
-     *
-     * @param namenode	address of HDFS namenode
-     * @param singa_conf	singa global configuration file
-     * @param job_conf_file	job configuration file
-     */
-    SingaScheduler(string namenode, string job_conf_file, string singa_conf, int jc)
-      : job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false),
job_counter_(jc), task_counter_(0) {
-        hdfs_handle_ = hdfs_connect(namenode);
-        if (!hdfs_handle_ || !hdfs_overwrite(hdfs_handle_, SINGA_CONFIG, singa_conf))
-          LOG(ERROR) << "Failed to connect to HDFS";
-
-        ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_);
-      }
-    virtual void registered(SchedulerDriver *driver,
-        const mesos::FrameworkID& frameworkId,
-        const mesos::MasterInfo& masterInfo) {
-    }
-
-    virtual void reregistered(SchedulerDriver *driver,
-        const mesos::MasterInfo& masterInfo) {
-    }
-
-    virtual void disconnected(SchedulerDriver *driver) {
-    }
-
-    /**
-     * Handle resource offering from Mesos scheduler. It implements the simple/naive
-     * scheduler:
-     * + For each offer that contains enough CPUs, adds new tasks to the list
-     * + Launch all the tasks when reaching the required number of tasks (nworkers_groups
+ nserver_groups). 
-     */
-    virtual void resourceOffers(SchedulerDriver* driver, const std::vector<mesos::Offer>&
offers) {
-      // do nothing if the task is already running
-      if (is_running_)
-        return;
-
-      for (int i = 0; i < offers.size(); i++) {
-        const mesos::Offer offer = offers[i];
-        // check for resource and create temporary tasks
-        int cpus = 0, mem = 0;
-        int nresources = offer.resources().size();
-
-        for (int r = 0; r < nresources; r++) {
-          const mesos::Resource& resource = offer.resources(r);
-          if (resource.name() == "cpus"
-              && resource.type() == mesos::Value::SCALAR)
-            cpus = resource.scalar().value();
-          else if (resource.name() == "mem"
-              && resource.type() == mesos::Value::SCALAR)
-            mem = resource.scalar().value();
-        }
-
-        if (!check_resources(cpus))
-          break;
-
-        vector<mesos::TaskInfo> *new_tasks = new vector<mesos::TaskInfo>();
-        mesos::TaskInfo task;
-        task.set_name("SINGA");
-
-        char string_id[100];
-        snprintf(string_id, 100, "SINGA_%d", nhosts_);
-        task.mutable_task_id()->set_value(string_id);
-        task.mutable_slave_id()->MergeFrom(offer.slave_id());
-
-        mesos::Resource *resource = task.add_resources();
-        resource->set_name("cpus");
-        resource->set_type(mesos::Value::SCALAR);
-        // take only nworkers_per_group CPUs
-        resource->mutable_scalar()->set_value(job_conf_.cluster().nworkers_per_group());
-
-        resource = task.add_resources();
-        resource->set_name("mem");
-        resource->set_type(mesos::Value::SCALAR);
-        // take all the memory
-        resource->mutable_scalar()->set_value(mem);
-
-        // store in temporary map
-        new_tasks->push_back(task);
-        tasks_[offer.id().value()] = new_tasks;
-        hostnames_[offer.id().value()] = offer.hostname();
-        nhosts_++;
-      }
-
-      if (nhosts_>= job_conf_.cluster().nworker_groups()) {
-        LOG(INFO) << "Acquired enough resources: "
-          << job_conf_.cluster().nworker_groups()*job_conf_.cluster().nworkers_per_group()
-          << " CPUs over " << job_conf_.cluster().nworker_groups() << "
hosts. Launching tasks ... ";
-
-        // write job_conf_file_ to /singa/job_id/job.conf
-        char path[512];
-        snprintf(path, 512, "/singa/%d/job.conf", job_counter_);
-        hdfs_overwrite(hdfs_handle_, path, job_conf_file_);
-
-        // launch tasks
-        for (map<string, vector<mesos::TaskInfo>*>::iterator it =
-            tasks_.begin(); it != tasks_.end(); ++it) {
-          prepare_tasks(it->second, hostnames_[it->first], job_counter_, path);
-          mesos::OfferID newId;
-          newId.set_value(it->first);
-          LOG(INFO) << "Launching task with offer ID = " << newId.value();
-          driver->launchTasks(newId, *(it->second));
-          task_counter_++;
-          if (task_counter_>= job_conf_.cluster().nworker_groups())
-            break;
-        }
-
-        job_counter_++;
-        is_running_ = true;
-      }
-    }
-
-    virtual void offerRescinded(SchedulerDriver *driver,
-        const mesos::OfferID& offerId) {
-    }
-
-    virtual void statusUpdate(SchedulerDriver* driver,
-        const mesos::TaskStatus& status) {
-      if (status.state() == mesos::TASK_FINISHED)
-        task_counter_--;
-
-      if (task_counter_ == 0) {
-        driver->stop();
-      } else if (status.state() == mesos::TASK_FAILED) {
-        LOG(ERROR) << "TASK FAILED !!!!";
-        driver->abort();
-      }
-    }
-
-    virtual void frameworkMessage(SchedulerDriver* driver,
-        const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId,
-        const string& data) {
-    }
-
-    virtual void slaveLost(SchedulerDriver* driver,
-        const mesos::SlaveID& slaveId) {
-    }
-
-    virtual void executorLost(SchedulerDriver* driver,
-        const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId,
-        int status) {
-    }
-
-    virtual void error(SchedulerDriver* driver, const string& message) {
-      LOG(ERROR) << "ERROR !!! " << message;
-    }
-
- private:
-    /**
-     * Helper function that initialize TaskInfo with the correct URI and command
-     */
-    void prepare_tasks(vector<mesos::TaskInfo> *tasks, string hostname, int job_id,
string job_conf) {
-      char path_sys_config[512], path_job_config[512];
-      // path to singa.conf
-      snprintf(path_sys_config, 512, "hdfs://%s%s", namenode_.c_str(), SINGA_CONFIG);
-      snprintf(path_job_config, 512, "hdfs://%s%s", namenode_.c_str(), job_conf.c_str());
-
-      char command[512];
-      snprintf(command, 512, "singa -conf ./job.conf -singa_conf ./singa.conf -singa_job
%d -host %s", job_id, hostname.c_str());
-
-      for (int i=0; i < tasks->size(); i++) {
-        mesos::CommandInfo *comm = (tasks->at(i)).mutable_command();
-        comm->add_uris()->set_value(path_sys_config);
-        comm->add_uris()->set_value(path_job_config);
-        comm->set_value(command);
-      }
-    }
-
-    /**
-     * Helper function to connect to HDFS
-     */
-    hdfsFS hdfs_connect(string namenode) {
-      string path(namenode);
-      int idx = path.find_first_of(":");
-      string host = path.substr(0, idx);
-      int port = atoi(path.substr(idx+1).c_str());
-      return hdfsConnect(host.c_str(), port);
-    }
-
-    /**
-     * Helper function to read HDFS file content into a string. 
-     * It assumes the file exists. 
-     * @return NULL if there's error. 
-     */
-    string hdfs_read(hdfsFS hdfs_handle, string filename) {
-      hdfsFileInfo* stat = hdfsGetPathInfo(hdfs_handle, filename.c_str());
-      int file_size = stat->mSize;
-      string buffer;
-      buffer.resize(file_size);
-
-      hdfsFile file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_RDONLY, 0, 0, 0);
-      int status = hdfsRead(hdfs_handle, file, const_cast<char*>(buffer.c_str()), stat->mSize);
-      hdfsFreeFileInfo(stat, 1);
-      hdfsCloseFile(hdfs_handle, file);
-      if (status != -1)
-        return string(buffer);
-      else
-        return NULL;
-    }
-
-    /**
-     * Helper function that write content of source_file to filename, overwritting the latter

-     * if it exists. 
-     * @return 1 if sucessfull, 0 if fail. 
-     */
-    int hdfs_overwrite(hdfsFS hdfs_handle, string filename, string source_file) {
-      hdfsFile file;
-      if (hdfsExists(hdfs_handle, filename.c_str()) == 0) {
-        file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0);
-      } else {
-        // create directory and file
-        int last_idx = filename.find_last_of("/");
-        string dir = filename.substr(0, last_idx);
-        hdfsCreateDirectory(hdfs_handle, dir.c_str());
-        file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0);
-      }
-
-      FILE *fh = fopen(source_file.c_str(), "r");
-      if (!fh) {
-        LOG(ERROR) << "Cannot open " << source_file;
-        return 0;
-      }
-
-      if (file) {
-        fseek(fh, 0, SEEK_END);
-        int len = ftell(fh);
-        rewind(fh);
-        string buf;
-        buf.resize(len);
-        fread(const_cast<char*>(buf.c_str()), len, 1, fh);
-        fclose(fh);
-
-        hdfsWrite(hdfs_handle, file, buf.c_str(), len);
-        hdfsFlush(hdfs_handle, file);
-        hdfsCloseFile(hdfs_handle, file);
-      } else {
-        LOG(ERROR) << "ERROR openng file on HDFS " << filename;
-        return 0;
-      }
-
-      return 1;
-    }
-
-    /**
-     * Helper function, check if the offered CPUs satisfies the resource requirements
-     * @param ncpus:	number of cpus offer at this host
-     * @return true		when ncpus >= (nWorkersPerProcess + nServersPerProcess) if workers
and servers are separated
-     *								or when cpus >= max(nWorkersPerProcess, nServersPerProcess) if they are
not. 
-     */
-    bool check_resources(int ncpus) {
-      int n1 = job_conf_.cluster().nworkers_per_procs();
-      int n2 = job_conf_.cluster().nservers_per_procs();
-      LOG(INFO) << "n1 = " << n1 << " n2 = " << n2 << " ncpus
= " << ncpus;
-      return job_conf_.cluster().server_worker_separate()? ncpus >= (n1+n2) : ncpus >=
(n1 > n2 ? n1 : n2);
-    }
-
-    int job_counter_, task_counter_;
-
-    // true if the job has been launched
-    bool is_running_;
-    singa::JobProto job_conf_;
-    // total number of hosts required
-    int nhosts_;
-    // temporary map of tasks: <offerID, TaskInfo>
-    map<string, vector<mesos::TaskInfo>*> tasks_;
-    // temporary map of offerID to slave IP addresses
-    map<string, string> hostnames_; 
-    // SINGA job config file
-    string job_conf_file_;
-    // HDFS namenode
-    string namenode_;
-    // handle to HDFS
-    hdfsFS hdfs_handle_;
-};
-
-int main(int argc, char** argv) {
-  FLAGS_logtostderr = 1;
-  int status = mesos::DRIVER_RUNNING;
-  SingaScheduler *scheduler;
-  if (!(argc == 2 || argc == 4 || argc == 6)) {
-    std::cout << usage << std::endl;
-    return 1;
-  }
-
-  int scheduler_conf_idx = 0;
-  int singa_conf_idx = 0;
-  for (int i=1; i < argc-1; i++) {
-    if (strcmp(argv[i], "-scheduler_conf") == 0)
-      scheduler_conf_idx = i+1;
-    if (strcmp(argv[i], "-singa_conf") == 0)
-      singa_conf_idx = i+1;
-  }
-
-  SchedulerProto msg;
-  if (scheduler_conf_idx)
-    singa::ReadProtoFromTextFile((const char*)argv[scheduler_conf_idx], &msg);
-  else
-    singa::ReadProtoFromTextFile(DEFAULT_SCHEDULER_CONF, &msg);
-
-  if (!singa_conf_idx)
-    scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), msg.job_counter());
-  else
-    scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), string(argv[singa_conf_idx]),
msg.job_counter());
-
-  msg.set_job_counter(msg.job_counter()+1);
-  if (scheduler_conf_idx)
-    singa::WriteProtoToTextFile(msg, (const char*)argv[scheduler_conf_idx]);
-  else
-    singa::WriteProtoToTextFile(msg, DEFAULT_SCHEDULER_CONF);
-
-  LOG(INFO) << "Scheduler initialized";
-  mesos::FrameworkInfo framework;
-  framework.set_user("");
-  framework.set_name("SINGA");
-
-  SchedulerDriver *driver = new mesos::MesosSchedulerDriver(scheduler, framework, msg.master().c_str());
-  LOG(INFO) << "Starting SINGA framework...";
-  status = driver->run();
-  driver->stop();
-  LOG(INFO) << "Stoping SINGA framework...";
-
-  return status == mesos::DRIVER_STOPPED ? 0 : 1;
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2e21c62/tool/node.sh
----------------------------------------------------------------------
diff --git a/tool/node.sh b/tool/node.sh
deleted file mode 100755
index 653806f..0000000
--- a/tool/node.sh
+++ /dev/null
@@ -1,88 +0,0 @@
-#/**
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-#!/bin/bash
-if [[ $# < 2 || ! -f $2 ]]
-then
-  echo "Usage: process/folder management"
-  echo "[cat, create, delete, kill, ls, ps, reset, scp, ssh] hostfile [args]"
-  echo "   cat hostfile file--- cat the file on every node in hostfile"
-  echo "   create hostfile folder--- create the folder on every node in hostfile"
-  echo "   delete hostfile folder--- delete the folder on every node in hostfile"
-  echo "   kill hostfile job_name---  kill the job on every node in hostfile"
-  echo "   ls hostfile folder--- list the folder on every node in hostfile"
-  echo "   ps hostfile job_name---  ps aux|grep job_name on every node in hostfile"
-  echo "   reset hostfile folder--- delete and create the folder on every node in hostfile"
-  echo "   scp hostfile local_dir [remote_dir]--- copy the local_dir to remote_dir on every
node in hostfile, if remote_dir is omitted, remote_dir=local_dir"
-  echo "   ssh hostfile--- test whether the nodes in hostfile are alive"
-  echo "each line in hostfile is a node name followed by a space and other fields"
-  exit
-fi
-
-ssh_options="-oStrictHostKeyChecking=no \
--oUserKnownHostsFile=/dev/null \
--oLogLevel=quiet"
-
-hosts=(`cat $2 |cut -d ' ' -f 1`)
-
-for i in ${hosts[@]}
-do
-  if [ $1 == "cat" ]
-  then
-    cmd="cat $3"
-  elif [ $1 == "create" -o $1 == "reset" ]
-  then
-    cmd="mkdir -p $3"
-  elif [ $1 == "delete" -o $1 == "reset" ]
-  then
-    cmd="rm -rf $3"
-  elif [ $1 == "kill" ]
-  then
-    cmd="ps ax|pgrep $3 |xargs kill"
-  elif [ $1 == "ls" ]
-  then
-    cmd="ls -l $3"
-  elif [ $1 == "scp" ]
-  then
-    local_dir=$3
-    remote_dir=$3
-    if [ $# -eq 4 ]
-    then
-      remote_dir=$4
-    fi
-    r=''
-    if [[ -d $3 ]]
-    then
-      r='-r'
-    fi
-    echo "scp $r $local_dir $i:$remote_dir"
-    scp $r $local_dir $i:$remote_dir
-  elif [ $1 == "ssh" ]
-  then
-    cmd="exit"
-  elif [ $1 == "ps" ]
-  then
-    cmd="ps ax|pgrep $3"
-  else
-    echo "Incorrect commands:" $1
-  fi
-  if [ $1 != "scp" ]
-  then
-    echo $cmd
-    ssh $i $cmd
-  fi
-done


Mime
View raw message