Workflow Tasks Graph

cloudify.workflows.tasks_graph.make_or_get_graph(f)[source]

Decorate a graph-creating function with this, to automatically make it try to retrieve the graph from storage first.

class cloudify.workflows.tasks_graph.TaskDependencyGraphError(task_name, traceback, error_causes, error_time=None)[source]

Bases: object

class cloudify.workflows.tasks_graph.TaskDependencyGraphErrors[source]

Bases: object

add_error(result, task)[source]
first_error_time()[source]
format_exception()[source]

Turn errors stored here into a single human-readable WorkflowFailed

This formats the actual message the user will see. Show information about all errors that happened, and a traceback.

class cloudify.workflows.tasks_graph.TaskDependencyGraph(workflow_context, graph_id=None, default_subgraph_task_config=None)[source]

Bases: object

A task graph.

Parameters:

workflow_context – A WorkflowContext instance (used for logging)

classmethod restore(workflow_context, retrieved_graph)[source]
optimize()[source]

Optimize this tasks graph, removing tasks that do nothing.

Empty subgraphs, and NOP tasks, are dropped. A subgraph is considered empty if it only contains NOP tasks, and empty subgraphs.

linearize()[source]

Traverse the graph, and return tasks in dependency order.

This makes sure that if task A depends on task B, then A is going to be after B in the resulting list. Ordering of tasks that are not related by dependencies is undefined.

This is useful for logging, debugging, and testing.

store(name, optimize=True)[source]
property tasks
add_task(task)[source]

Add a WorkflowTask to this graph

Parameters:

task – The task

get_task(task_id)[source]

Get a task instance that was inserted to this graph by its id

Parameters:

task_id – the task id

Returns:

a WorkflowTask instance for the requested task if found. None, otherwise.

remove_task(task)[source]

Remove the provided task from the graph

Parameters:

task – The task

add_dependency(src_task, dst_task)[source]

Add a dependency between tasks: src depends on dst.

The source task will only be executed after the target task terminates. A task may depend on several tasks, in which case it will only be executed after all its ‘destination’ tasks terminate

Parameters:
  • src_task – The source task

  • dst_task – The target task

remove_dependency(src_task, dst_task)[source]
sequence()[source]
Returns:

a new TaskSequence for this graph

subgraph(name)[source]
execute()[source]

Execute tasks in this graph.

Run ready tasks, register callbacks on their result, process results from tasks that did finish. Tasks whose dependencies finished, are marked as ready for the next iteration.

This main loop is directed by the _tasks_wait event, which is set only when there is something to be done: a task response has been received, some tasks dependencies finished which makes new tasks ready to be run, or the execution was cancelled.

If a task failed, wait for ctx.wait_after_fail for additional responses to come in anyway.

class cloudify.workflows.tasks_graph.forkjoin(*tasks)[source]

Bases: object

A simple wrapper for tasks. Used in conjunction with TaskSequence. Defined to make the code easier to read (instead of passing a list) see TaskSequence.add for more details

class cloudify.workflows.tasks_graph.TaskSequence(graph)[source]

Bases: object

Helper class to add tasks in a sequential manner to a task dependency graph

Parameters:

graph – The TaskDependencyGraph instance

add(*tasks)[source]

Add tasks to the sequence.

Parameters:

tasks

Each task might be: * A WorkflowTask instance, in which case, it will be

added to the graph with a dependency between it and the task previously inserted into the sequence

  • A forkjoin of tasks, in which case it will be treated as a “fork-join” task in the sequence, i.e. all the fork-join tasks will depend on the last task in the sequence (could be fork join) and the next added task will depend on all tasks in this fork-join task

class cloudify.workflows.tasks_graph.SubgraphTask(graph, workflow_context=None, task_id=None, total_retries=0, **kwargs)[source]

Bases: WorkflowTask

classmethod restore(ctx, graph, task_descr)[source]
property cloudify_context
is_local()[source]
Returns:

Is this a local task

property name
Returns:

The task name

property is_subgraph
is_nop()[source]
Returns:

Is this a NOP task

sequence()[source]
subgraph(name)[source]
add_task(task)[source]
remove_task(task)[source]
add_dependency(src_task, dst_task)[source]
apply_async()[source]
task_terminated(task, new_task=None)[source]
set_state(state)[source]

Set the task state

Parameters:

state – The state to set [pending, sending, sent, started, rescheduled, succeeded, failed]