Return-Path: X-Original-To: apmail-kylin-commits-archive@minotaur.apache.org Delivered-To: apmail-kylin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7AEF919F89 for ; Sun, 27 Mar 2016 01:32:06 +0000 (UTC) Received: (qmail 33223 invoked by uid 500); 27 Mar 2016 01:32:06 -0000 Delivered-To: apmail-kylin-commits-archive@kylin.apache.org Received: (qmail 33137 invoked by uid 500); 27 Mar 2016 01:32:06 -0000 Mailing-List: contact commits-help@kylin.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.apache.org Delivered-To: mailing list commits@kylin.apache.org Received: (qmail 31942 invoked by uid 99); 27 Mar 2016 01:32:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 27 Mar 2016 01:32:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 82DE0E97E6; Sun, 27 Mar 2016 01:32:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shaofengshi@apache.org To: commits@kylin.apache.org Date: Sun, 27 Mar 2016 01:32:42 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [39/50] [abbrv] kylin git commit: KYLIN-1249 A client library to help automatic cube http://git-wip-us.apache.org/repos/asf/kylin/blob/5593d824/tools/kylin_client_tool/scheduler/workers/cube.py ---------------------------------------------------------------------- diff --git a/tools/kylin_client_tool/scheduler/workers/cube.py b/tools/kylin_client_tool/scheduler/workers/cube.py new file mode 100644 index 0000000..b18be6e --- /dev/null +++ b/tools/kylin_client_tool/scheduler/workers/cube.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +__author__ = 'Huang, Hua' + +import time +import datetime +import calendar +from jobs.build import CubeBuildJob +from jobs.cube import CubeJob +from models.request import JobBuildRequest +from models.job import JobInstance, CubeJobStatus +from settings.settings import KYLIN_JOB_MAX_COCURRENT, KYLIN_JOB_MAX_RETRY + + +class CubeWorker: + job_instance_dict = {} + job_retry_dict = {} + scheduler = None + run_cube_job_id = None + check_cube_job_id = None + + def __init__(self): + pass + + @staticmethod + def run_cube_job(endtime): + if CubeWorker.all_finished(): + return True + + running_job_list = CubeWorker.get_current_running_job() + print "current running", len(running_job_list), "jobs" + + if running_job_list and len(running_job_list) >= KYLIN_JOB_MAX_COCURRENT: + print "will not schedule new jobs this time because running job number >= the max cocurrent job number", KYLIN_JOB_MAX_COCURRENT + else: + max_allow = KYLIN_JOB_MAX_COCURRENT - len(running_job_list) + for cube_name in CubeWorker.job_instance_dict: + if max_allow <= 0: break + + job_instance = CubeWorker.job_instance_dict[cube_name] + if job_instance is None or ( + isinstance(job_instance, JobInstance) and job_instance.get_status() == CubeJobStatus.ERROR): + try_cnt = CubeWorker.job_retry_dict.get(cube_name, -1) + + if try_cnt >= KYLIN_JOB_MAX_RETRY: + # have already tried KYLIN_JOB_MAX_RETRY times + CubeWorker.job_instance_dict[cube_name] = 0 + else: + # try to cancel the error cube build segment + error_job_list = CubeJob.get_cube_job(cube_name, CubeJob.ERROR_JOB_STATUS) + if error_job_list: + for error_job in error_job_list: + CubeBuildJob.cancel_job(error_job.uuid) + print "cancel an error job with uuid=", error_job.uuid, "for cube=", cube_name + + # run cube job + # instance_list = CubeJob.list_cubes(cube_name) + build_request = JobBuildRequest() + if endtime is not None: + # build_request.startTime = instance_list[0].segments[instance_list[0].segments.__len__() - 1].date_range_end + build_request.endTime = \ + (int(time.mktime(datetime.datetime.strptime(endtime, + "%Y-%m-%d").timetuple())) - time.timezone) * 1000 + else: + d = datetime.datetime.utcnow() + build_request.endTime = calendar.timegm(d.utctimetuple()) * 1000 + + current_job_instance = CubeBuildJob.rebuild_cube(cube_name, build_request) + + if current_job_instance: + print "schedule a cube build job for cube =", cube_name + CubeWorker.job_instance_dict[cube_name] = current_job_instance + max_allow -= 1 + CubeWorker.job_retry_dict[cube_name] = try_cnt + 1 + + @staticmethod + def check_cube_job(): + for cube_name in CubeWorker.job_instance_dict: + job_instance = CubeWorker.job_instance_dict[cube_name] + if isinstance(job_instance, + JobInstance) and job_instance.uuid and job_instance.get_status() == CubeJobStatus.RUNNING: + current_job_instance = CubeBuildJob.get_job(job_instance.uuid) + if current_job_instance: + CubeWorker.job_instance_dict[cube_name] = current_job_instance + + job_instance = CubeWorker.job_instance_dict[cube_name] + if job_instance is None: + print "status of cube =", cube_name, "is NOT STARTED YET" + elif isinstance(job_instance, JobInstance): + print "status of cube =", cube_name, "is", job_instance.get_status(), "at %d/%d" % ( + job_instance.get_current_step(), len(job_instance.steps)) + + @staticmethod + def get_current_running_job(): + if not CubeWorker.job_instance_dict: + return None + + running_job_list = [] + for cube_name in CubeWorker.job_instance_dict: + job_instance = CubeWorker.job_instance_dict[cube_name] + + if job_instance and isinstance(job_instance, + JobInstance) and job_instance.get_status() == CubeJobStatus.RUNNING: + running_job_list.append(job_instance) + + return running_job_list + + @staticmethod + def all_finished(): + if not CubeWorker.job_instance_dict: + return True + + for cube_name in CubeWorker.job_instance_dict: + job_instance = CubeWorker.job_instance_dict[cube_name] + + if job_instance == 0: + pass + elif job_instance is None: + return False + elif isinstance(job_instance, JobInstance) and job_instance.get_status() == CubeJobStatus.RUNNING: + return False + + return True http://git-wip-us.apache.org/repos/asf/kylin/blob/5593d824/tools/kylin_client_tool/settings/__init__.py ---------------------------------------------------------------------- diff --git a/tools/kylin_client_tool/settings/__init__.py b/tools/kylin_client_tool/settings/__init__.py new file mode 100644 index 0000000..1b249ac --- /dev/null +++ b/tools/kylin_client_tool/settings/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +__author__ = 'Huang, Hua' http://git-wip-us.apache.org/repos/asf/kylin/blob/5593d824/tools/kylin_client_tool/settings/settings.py ---------------------------------------------------------------------- diff --git a/tools/kylin_client_tool/settings/settings.py b/tools/kylin_client_tool/settings/settings.py new file mode 100644 index 0000000..3e7dcfd --- /dev/null +++ b/tools/kylin_client_tool/settings/settings.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +__author__ = 'Huang, Hua' + +import os + +WORKING_DIR = os.path.dirname(os.path.dirname(__file__)) +PID_PATH = os.path.join(WORKING_DIR, 'kylin_client_tool.pid') + +KYLIN_USER = 'ADMIN' +KYLIN_PASSWORD = 'KYLIN' +KYLIN_REST_HOST = 'http://123.103.21.35' +KYLIN_REST_PORT = 7070 +KYLIN_REST_PATH_PREFIX = '/kylin/api' +KYLIN_JOB_MAX_COCURRENT = 3 +KYLIN_JOB_MAX_RETRY = 1 http://git-wip-us.apache.org/repos/asf/kylin/blob/5593d824/tools/kylin_client_tool/setup-mac.sh ---------------------------------------------------------------------- diff --git a/tools/kylin_client_tool/setup-mac.sh b/tools/kylin_client_tool/setup-mac.sh new file mode 100644 index 0000000..3ec78d8 --- /dev/null +++ b/tools/kylin_client_tool/setup-mac.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +if [ ! -f "requests-2.9.1.tar.gz" ] +then + echo "requests binary file found" + curl -O https://pypi.python.org/packages/source/r/requests/requests-2.9.1.tar.gz || echo "download requests failed" +fi +tar -zxvf requests-2.9.1.tar.gz +cd requests-2.9.1 +python setup.py install +cd .. + +if [ ! -f "APScheduler-3.0.5.tar.gz" ] +then + echo "APScheduler binary file found" + curl -O https://pypi.python.org/packages/source/A/APScheduler/APScheduler-3.0.5.tar.gz || echo "download APScheduler failed" +fi +tar -zxvf APScheduler-3.0.5.tar.gz +cd APScheduler-3.0.5 +python setup.py install \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/5593d824/tools/kylin_client_tool/setup.sh ---------------------------------------------------------------------- diff --git a/tools/kylin_client_tool/setup.sh b/tools/kylin_client_tool/setup.sh new file mode 100644 index 0000000..bb7be9c --- /dev/null +++ b/tools/kylin_client_tool/setup.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +if [ ! -f "requests-2.9.1.tar.gz" ] +then + echo "requests binary file found" + wget https://pypi.python.org/packages/source/r/requests/requests-2.9.1.tar.gz || echo "download requests failed" +fi +tar -zxvf requests-2.9.1.tar.gz +cd requests-2.9.1 +python setup.py install +cd .. + +if [ ! -f "APScheduler-3.0.5.tar.gz" ] +then + echo "APScheduler binary file found" + wget https://pypi.python.org/packages/source/A/APScheduler/APScheduler-3.0.5.tar.gz || echo "download APScheduler failed" +fi +tar -zxvf APScheduler-3.0.5.tar.gz +cd APScheduler-3.0.5 +python setup.py install \ No newline at end of file