ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject [3/4] incubator-ariatosca git commit: Fix pylint issues in aria.workflows.core package
Date Wed, 19 Oct 2016 15:11:34 GMT
Fix pylint issues in aria.workflows.core package


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/177a3f6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/177a3f6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/177a3f6c

Branch: refs/heads/pylint-aria-storage
Commit: 177a3f6ce25c2b935025b28634baaafc5261f36e
Parents: f300324
Author: Ran Ziv <ran@gigaspaces.com>
Authored: Wed Oct 19 17:24:19 2016 +0300
Committer: Ran Ziv <ran@gigaspaces.com>
Committed: Wed Oct 19 18:04:00 2016 +0300

----------------------------------------------------------------------
 .pylintrc                          |  3 +-
 aria/workflows/core/engine.py      | 12 ++++++-
 aria/workflows/core/executor.py    | 35 ++++++++++++++++----
 aria/workflows/core/tasks.py       | 57 +++++++++++++++++++++++++++++----
 aria/workflows/core/translation.py | 33 ++++++++++++++++---
 5 files changed, 119 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/177a3f6c/.pylintrc
----------------------------------------------------------------------
diff --git a/.pylintrc b/.pylintrc
index 120a828..7f4f203 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -62,8 +62,7 @@ confidence=
 # --enable=similarities". If you want to run only the classes checker, but have
 # no Warning level messages displayed, use"--disable=all --enable=classes
 # --disable=W"
-disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating
-disable=redefined-builtin
+disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin
 
 [REPORTS]
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/177a3f6c/aria/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py
index 7cc4781..9c6eff8 100644
--- a/aria/workflows/core/engine.py
+++ b/aria/workflows/core/engine.py
@@ -13,6 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""
+The workflow engine. Executes workflows
+"""
+
 import time
 
 import networkx
@@ -23,6 +27,9 @@ from . import translation
 
 
 class Engine(logger.LoggerMixin):
+    """
+    The workflow engine. Executes workflows
+    """
 
     def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
         super(Engine, self).__init__(**kwargs)
@@ -35,6 +42,9 @@ class Engine(logger.LoggerMixin):
                                           execution_graph=self._execution_graph)
 
     def execute(self):
+        """
+        execute the workflow
+        """
         try:
             events.start_workflow_signal.send(self._workflow_context)
             while True:
@@ -65,7 +75,7 @@ class Engine(logger.LoggerMixin):
         return len(self._execution_graph.succ.get(task.id, {})) > 0
 
     def _all_tasks_consumed(self):
-        len(self._execution_graph.node) == 0
+        return len(self._execution_graph.node) == 0
 
     def _tasks_iter(self):
         return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/177a3f6c/aria/workflows/core/executor.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/executor.py b/aria/workflows/core/executor.py
index 0c3aeb1..ace445a 100644
--- a/aria/workflows/core/executor.py
+++ b/aria/workflows/core/executor.py
@@ -13,6 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""
+Executors for workflow tasks
+"""
+
 import threading
 import multiprocessing
 import Queue
@@ -24,14 +28,24 @@ from aria.tools import module
 
 
 class BaseExecutor(object):
+    """
+    Base class for executors for running tasks
+    """
 
     def __init__(self, *args, **kwargs):
         pass
 
     def execute(self, task):
+        """
+        Execute a task
+        :param task: task to execute
+        """
         raise NotImplementedError
 
     def close(self):
+        """
+        Close the executor
+        """
         pass
 
     @staticmethod
@@ -48,6 +62,9 @@ class BaseExecutor(object):
 
 
 class CurrentThreadBlockingExecutor(BaseExecutor):
+    """
+    Executor which runs tasks in the current thread (blocking)
+    """
 
     def execute(self, task):
         self._task_started(task)
@@ -61,6 +78,9 @@ class CurrentThreadBlockingExecutor(BaseExecutor):
 
 
 class ThreadExecutor(BaseExecutor):
+    """
+    Executor which runs tasks in a separate thread
+    """
 
     def __init__(self, pool_size=1, *args, **kwargs):
         super(ThreadExecutor, self).__init__(*args, **kwargs)
@@ -96,11 +116,14 @@ class ThreadExecutor(BaseExecutor):
                 except BaseException as e:
                     self._task_failed(task, exception=e)
             # Daemon threads
-            except:
+            except BaseException:
                 pass
 
 
 class MultiprocessExecutor(BaseExecutor):
+    """
+    Executor which runs tasks in a multiprocess environment
+    """
 
     def __init__(self, pool_size=1, *args, **kwargs):
         super(MultiprocessExecutor, self).__init__(*args, **kwargs)
@@ -108,9 +131,9 @@ class MultiprocessExecutor(BaseExecutor):
         self._manager = multiprocessing.Manager()
         self._queue = self._manager.Queue()
         self._tasks = {}
-        self._listener = threading.Thread(target=self._listener)
-        self._listener.daemon = True
-        self._listener.start()
+        self._listener_thread = threading.Thread(target=self._listener)
+        self._listener_thread.daemon = True
+        self._listener_thread.start()
         self._pool = multiprocessing.Pool(processes=pool_size,
                                           maxtasksperchild=1)
 
@@ -126,7 +149,7 @@ class MultiprocessExecutor(BaseExecutor):
         self._pool.close()
         self._stopped = True
         self._pool.join()
-        self._listener.join()
+        self._listener_thread.join()
 
     def _listener(self):
         while not self._stopped:
@@ -143,7 +166,7 @@ class MultiprocessExecutor(BaseExecutor):
                     # TODO: something
                     raise RuntimeError()
             # Daemon threads
-            except:
+            except BaseException:
                 pass
 
     def _remove_task(self, task_id):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/177a3f6c/aria/workflows/core/tasks.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/tasks.py b/aria/workflows/core/tasks.py
index 83b4263..76ae609 100644
--- a/aria/workflows/core/tasks.py
+++ b/aria/workflows/core/tasks.py
@@ -13,43 +13,87 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""
+Workflow tasks
+"""
+
 
 class BaseTask(object):
+    """
+    Base class for Task objects
+    """
 
     def __init__(self, id, name, context):
-        self.id = id
-        self.name = name
-        self.context = context
+        self._id = id
+        self._name = name
+        self._context = context
+
+    @property
+    def id(self):
+        """
+        :return: the task's id
+        """
+        return self._id
+
+    @property
+    def name(self):
+        """
+        :return: the task's name
+        """
+        return self._name
+
+    @property
+    def context(self):
+        """
+        :return: the task's context
+        """
+        return self._context
 
 
 class StartWorkflowTask(BaseTask):
+    """
+    Tasks marking a workflow start
+    """
     pass
 
 
 class EndWorkflowTask(BaseTask):
+    """
+    Tasks marking a workflow end
+    """
     pass
 
 
 class StartSubWorkflowTask(BaseTask):
+    """
+    Tasks marking a subworkflow start
+    """
     pass
 
 
 class EndSubWorkflowTask(BaseTask):
+    """
+    Tasks marking a subworkflow end
+    """
     pass
 
 
 class OperationTask(BaseTask):
+    """
+    Operation tasks
+    """
+
     def __init__(self, *args, **kwargs):
         super(OperationTask, self).__init__(*args, **kwargs)
         self._create_operation_in_storage()
 
     def _create_operation_in_storage(self):
-        Operation = self.context.storage.operation.model_cls
-        operation = Operation(
+        operation_cls = self.context.storage.operation.model_cls
+        operation = operation_cls(
             id=self.context.id,
             execution_id=self.context.execution_id,
             max_retries=self.context.parameters.get('max_retries', 1),
-            status=Operation.PENDING,
+            status=operation_cls.PENDING,
         )
         self.context.operation = operation
 
@@ -58,4 +102,3 @@ class OperationTask(BaseTask):
             return getattr(self.context, attr)
         except AttributeError:
             return super(OperationTask, self).__getattribute__(attr)
-

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/177a3f6c/aria/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/translation.py b/aria/workflows/core/translation.py
index 71d7bcd..dc483c6 100644
--- a/aria/workflows/core/translation.py
+++ b/aria/workflows/core/translation.py
@@ -13,6 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""
+Translation of user graph's API to the execution graph
+"""
+
 from aria import contexts
 
 from . import tasks
@@ -25,6 +29,15 @@ def build_execution_graph(
         start_cls=tasks.StartWorkflowTask,
         end_cls=tasks.EndWorkflowTask,
         depends_on=()):
+    """
+    Translates the user graph to the execution graph
+    :param task_graph: The user's graph
+    :param workflow_context: The workflow
+    :param execution_graph: The execution graph that is being built
+    :param start_cls: internal use
+    :param end_cls: internal use
+    :param depends_on: internal use
+    """
     # Insert start marker
     start_task = start_cls(id=_start_graph_suffix(task_graph.id),
                            name=_start_graph_suffix(task_graph.name),
@@ -32,7 +45,10 @@ def build_execution_graph(
     _add_task_and_dependencies(execution_graph, start_task, depends_on)
 
     for operation_or_workflow, dependencies in task_graph.task_tree(reverse=True):
-        operation_dependencies = _get_tasks_from_dependencies(execution_graph, dependencies,
default=[start_task])
+        operation_dependencies = _get_tasks_from_dependencies(
+            execution_graph,
+            dependencies,
+            default=[start_task])
 
         if _is_operation(operation_or_workflow):
             # Add the task an the dependencies
@@ -52,8 +68,14 @@ def build_execution_graph(
             )
 
     # Insert end marker
-    workflow_dependencies = _get_tasks_from_dependencies(execution_graph, task_graph.leaf_tasks,
default=[start_task])
-    end_task = end_cls(id=_end_graph_suffix(task_graph.id), name=_end_graph_suffix(task_graph.name),
context=workflow_context)
+    workflow_dependencies = _get_tasks_from_dependencies(
+        execution_graph,
+        task_graph.leaf_tasks,
+        default=[start_task])
+    end_task = end_cls(
+        id=_end_graph_suffix(task_graph.id),
+        name=_end_graph_suffix(task_graph.name),
+        context=workflow_context)
     _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies)
 
 
@@ -67,8 +89,9 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
     """
     Returns task list from dependencies.
     """
-    return [execution_graph.node[dependency.id if _is_operation(dependency) else _end_graph_suffix(dependency.id)]
-            ['task'] for dependency in dependencies] or default
+    return [execution_graph.node[dependency.id if _is_operation(dependency)
+                                 else _end_graph_suffix(dependency.id)]['task']
+            for dependency in dependencies] or default
 
 
 def _is_operation(task):


Mime
View raw message