Creating Custom Workflows


Get the latest docs

You are looking at documentation for an older release. Not what you want? Go to the current release documentation.

A guide to authoring Cloudify Workflows

Note

This section is aimed at advanced users. Before reading it, make sure you have a good understanding of Workflows, Blueprints, and Plugins.

Introduction to Implementing Workflows

Workflows implementation shares several similarities with plugins implementation:

  • Workflows are also implemented as Python functions.
  • A workflow method is, optionally, decorated with @workflow, a decorator from the cloudify.decorators module of the cloudify-plugins-common package.
  • Workflow methods should import ctx from cloudify.workflows, which offers access to context data and various system services. While sharing some resemblance, this ctx object is not of the same type as the one used by a plugin method.

Example: A typical workflow method’s signature

  
from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def my_workflow(**kwargs):
    pass
  

Workflow parameters are received by the method as named parameters, and so if a workflow uses parameters they should usually appear in the method signature as well (or they’ll end up under kwargs).

Tip

It’s recommended not to have default values for parameters in the workflow method’s signature. Instead, the default values should be provided in the workflow’s parameters declaration in the blueprint - That way, the parameter and its default value are visible to the user both in the blueprint and via CLI commands, and the user is also able to override the default value by simply editing the blueprint, without modifying code. For more information on workflow parameters declaration, refer to the Blueprint mapping section.

There are two approaches to implementing workflows:

  • Standard workflows - workflows which simply use the APIs to execute and manage tasks.
  • Graph-based workflows - workflows which use the APIs on top of the Graph framework, a framework which offers a simplified process of scheduling and creating dependencies among tasks, as well as built-in support for some of the common aspects of workflows (e.g. cancellation support).

APIs

The ctx object used by workflow methods is of type CloudifyWorkflowContext. It offers access to context data and various services. Additionally, any node, node instance, relationship or relationship instance objects returned by the context object (or from objects returned by the context object) will be wrapped in types which offer additional context data and services.

For full API reference, refer to the documentation over at cloudify-plugins-common.readthedocs.org.

Blueprint Mapping

As is the case with plugins, it’s the workflow author’s responsilibity to write a yaml file (named plugin.yaml by convention) which will contain both the workflow mapping as well as the workflow’s plugin declaration.

Mapping a workflow name to a workflow implementation in the blueprint is done in a similar fashion to the way plugin operations are mapped, i.e. in one of two ways:

  • Simple mapping - this should be used to map a workflow name to a workflow implementation which requires no parameters.

Example: Mapping my_workflow to be implemented by the my_workflow_method_name method in the my_workflow_module_name module in the my_workflow_plugin_name plugin

  
workflows:
  my_workflow: my_workflow_plugin_name.my_workflow_module_name.my_workflow_method_name
    

  • Mapping with parameters - this should be used to map a workflow name to a workflow implementation which uses parameters.

Workflow parameters declaration is done in a similar manner to the way type properties are declared: It’s structured as a schema map, where each entry specifies the parameter schema.

Example: Making the same mapping yet with parameters declaration - A single mandatory parameter named mandatory_parameter, and two optional parameters with default values (one of which is a complex value)*

  
workflows:
  my_workflow:
    mapping: my_workflow_plugin_name.my_workflow_module_name.my_workflow_method_name
    parameters:
      mandatory_parameter:
        description: this parameter is mandatory
      optional_parameter:
        description: this paramters is optional
        default: optional_parameter_default_value
      nested_parameter:
        description: >
          this parameter is also optional,
          it's default value has nested values.
        default:
          key1: value1
          key2: value2
    


The workflows implementations are considered as workflows plugins. As such, they are joined to the blueprint using the exact same mapping that’s used to join regular plugins, e.g.:

  
plugins:
  my_workflow_plugin_name:
    executor: central_deployment_agent
    source: http://example.com/url/to/plugin.zip
  

Note

It’s currently impossible to override a workflow mapping in the blueprint.

Cancellation Support

A workflow should have support for graceful (AKA Standard) cancellation. It is up to the workflow author to decide the semantics of graceful in this regard (and document them properly) - One workflow may merely stop the execution, while another may perform a rollback, and so on.

Standard workflows which don’t implement such support will simply ignore the cancellation request and continue executing the workflow. To implement cancellation support for standard workflows, some constructs from the cloudify.workflows.workflows_api module need to be used.

Importing the module is done as so:

from cloudify.workflows import api

Then, api.has_cancel_request() can be used to determine whether the workflow execution should be cancelled due to a standard cancellation request. If it returns True, the workflow should take whichever actions its author deems as a proper graceful cancellation, and then raise an api.ExecutionCancelled error.

Note

Waiting for a task to end by calling the method get of a WorkflowTaskResult object will make the execution go into blocking mode which responds to cancel requests by raising an api.ExecutionCancelled error. This means that standard workflows which use this method will in fact respond to a cancel request, even if that request was sent before the get method was called.

Further more - when a standard workflow’s code has finished running, the execution doesn’t actually end until all tasks that have been launched have completed as well. This is implemented by iterating over all tasks whose get method hasn’t yet been called and calling get on each one, and therefore if a cancel request was issued and any task was used in the workflow, yet hadn’t been called with get before the cancel request was received, then the workflow will respond to the cancel request at this final waiting phase by ending (no longer waiting for the rest of the tasks [if any] to end), and doing so with a cancelled status.

Graph-based workflows have inherent support for graceful cancellation. Upon receiving such a request once the graph’s execute method has been called, the defined behavior is for the workflow execution to simply end - yet any tasks related to the execution which might have been running at the moment of cancellation will continue to run until they’re over.

Once the graph execution ends, the tasks_graph’s method execute will raise the api.ExecutionCancelled error.

For both types of workflows, it’s of course possible to catch api.ExecutionCancelled errors that may have been raised, thus allowing to perform any sort of cleanup or custom behavior before re-raising the error.

Deprecation Notice

The api.EXECUTION_CANCELLED_RESULT value, which may have been returned from a workflow to signal that it has cancelled sucessfully, is now deprecated. Raise the api.ExecutionCancelled error instead to indicate such an event.

Backwards Compatibility Notice

The Graph API will now raise an api.ExecutionCancelled error instead of returning the deprecated api.EXECUTION_CANCELLED_RESULT in the event of an execution cancellation. This means that any workflows which made any additional operations beyond the call to the graph’s execute method, should now use a try-finally clause to be able to perform these additional operations and still raise the approriate error once they’re done.

Note

Neither standard workflows nor graph-based workflows have any control over force-cancellation requests. Any workflow execution which was issued with a force-cancellation request will be terminated immediately, while any tasks related to the execution which might have been running at the moment of termination will continue to run until they’re over.

Step by Step Tutorial

In this tutorial we will create from scratch a custom graph-based workflow whose purpose is to execute plugin operations.

The tutorial will offer some guidance and reference about the following:

  • Graph framework
    • Adding tasks
    • Creating dependencies between tasks
    • Using the TaskSequence construct
    • Using the forkjoin construct
  • Workflow APIs
    • Executing node operations
    • Executing relationship operations
    • Sending events
    • Working with nodes, node instances, relationships and relationship instances
  • Workflow parameters

Requirements

Similarly to plugins, workflows require the cloudify-plugins-common package to be able to use the Cloudify workflows API and framework.

Implementing the Workflow

We’ll be implementing the workflow one step at a time, where in each step we’ll have a valid, working workflow, but with more features than the one in the previous step.

This is the basic implementation of the desired behavior as a graph-based workflow:

  
from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def run_operation(**kwargs):
    graph = ctx.graph_mode()

    for node in ctx.nodes:
        for instance in node.instances:
            graph.add_task(instance.execute_operation('cloudify.interfaces.lifecycle.configure'))

    return graph.execute()
  

Step explanation:

  • The first thing we do is to set the workflow to be in graph mode, indicating we’ll be using the graph framework (line 6).
  • Then, we iterate over all node instances, and add an execute operation task for each instance to the graph (lines 8-10).
  • Finally, we tell the graph framework we’re done building our tasks graph and that execution may commence (line 12).

The basic workflow is great, if we always want to execute the exact same operation. How about we make it a bit more dynamic?

Lets add some workflow parameters:

  
from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def run_operation(operation, type_name, operation_kwargs, **kwargs):
    graph = ctx.graph_mode()

    for node in ctx.nodes:
        if type_name in node.type_hierarchy:
            for instance in node.instances:
                graph.add_task(instance.execute_operation(operation, kwargs=operation_kwargs))

    return graph.execute()
  

Step explanation:

  • The workflow method now receives three additional parameters: operation, type_name and operation_kwargs (line 5).
  • The operation parameter is used to make the workflow able to execute operations dynamically, rather than hardcoded ones; The operation_kwargs parameter is used to pass parameters to the operation itself (line 11).
  • Since the operation parameter value might be an operation which only exists for certain types, the type_name parameter is used to determine if the node at hand is of that type or from one derived from it (line 9).

The workflow’s much more functional now, but we’re pretty much in the dark when executing it. We’d like to know what’s happening at every point in time.

We’ll make the workflow more visible by sending out events:

  
from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def run_operation(operation, type_name, operation_kwargs, **kwargs):
    graph = ctx.graph_mode()

    for node in ctx.nodes:
        if type_name in node.type_hierarchy:
            for instance in node.instances:

                sequence = graph.sequence()

                sequence.add(
                    instance.send_event('Starting to run operation'),
                    instance.execute_operation(operation, kwargs=operation_kwargs),
                    instance.send_event('Done running operation'))

    return graph.execute()
  

Step explanation:

  • We create a TaskSequence object (named sequence) in the graph. We’ll be using it to control the tasks’ dependencies, ensuring the events are only sent when they should. Note that since this is done for each node instance separately, the sequences for the various node instances don’t depend on one another, and will be able to run in parallel (line 12).
  • Three tasks are inserted into the sequence - the original execute_operation task, wrapped by two send_event tasks. We used the send_event method of the instance (of type CloudifyWorkflowNodeInstance) rather than the send_event method of the ctx object (of type CloudifyWorkflowContext) since this way the event will contain node context information (lines 14-17).

Lets assume we wish for nodes to execute the operation in order, according to their relationships - each node should only execute the operation once all the nodes which it has relationships to are done executing the operations themselves.

We’ll achieve this behavior by adding task dependencies in the graph:

  
from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def run_operation(operation, type_name, operation_kwargs, **kwargs):
    graph = ctx.graph_mode()

    send_event_starting_tasks = {}
    send_event_done_tasks = {}

    for node in ctx.nodes:
        if type_name in node.type_hierarchy:
            for instance in node.instances:
                send_event_starting_tasks[instance.id] = instance.send_event('Starting to run operation')
                send_event_done_tasks[instance.id] = instance.send_event('Done running operation')

    for node in ctx.nodes:
        if type_name in node.type_hierarchy:
            for instance in node.instances:

                sequence = graph.sequence()

                sequence.add(
                    send_event_starting_tasks[instance.id],
                    instance.execute_operation(operation, kwargs=operation_kwargs),
                    send_event_done_tasks[instance.id])

    for node in ctx.nodes:
        for instance in node.instances:
            for rel in instance.relationships:

                instance_starting_task = send_event_starting_tasks.get(instance.id)
                target_done_task = send_event_done_tasks.get(rel.target_id)

                if instance_starting_task and target_done_task:
                    graph.add_dependency(instance_starting_task, target_done_task)

    return graph.execute()
  

Step explanation:

  • We aim to create task dependency between each node’s first task (the send_event about starting the operation) and the last task (the send_event about finishing the operation) of each node it has a relationship to.
  • First, we had to somewhat refactor the existing code - we need references for those tasks for creating the dependencies, and so we first created the tasks and stored them in two simple dictionaries which map each instance ID to that instance’s relevant tasks(lines 8-15).
  • When adding the tasks to the sequence, we add the tasks we’ve already created (lines 24 + 26)
  • Finally, we have a new section in the code, in which we go over all instances’ relationships, retrieve the source instance’s first task and the target instance’s last task, and if both exist (might not exist since the source and/or target node might not be be of type type_name or of a type which is derived from it) then a dependency is created between them (lines 28-36).

The workflow we’ve created thus far seems great for running node opeartions, but what about relationship operations?

Lets add support for those too:

  
from cloudify.decorators import workflow
from cloudify.workflows import ctx
from cloudify.workflows.tasks_graph import forkjoin

@workflow
def run_operation(operation, type_name, operation_kwargs, is_node_operation, **kwargs):
    graph = ctx.graph_mode()

    send_event_starting_tasks = {}
    send_event_done_tasks = {}

    for node in ctx.nodes:
        if type_name in node.type_hierarchy:
            for instance in node.instances:
                send_event_starting_tasks[instance.id] = instance.send_event('Starting to run operation')
                send_event_done_tasks[instance.id] = instance.send_event('Done running operation')

    for node in ctx.nodes:
        if type_name in node.type_hierarchy:
            for instance in node.instances:

                sequence = graph.sequence()

                if is_node_operation:
                    operation_task = instance.execute_operation(operation, kwargs=operation_kwargs)
                else:
                    forkjoin_tasks = []
                    for relationship in instance.relationships:
                        forkjoin_tasks.append(relationship.execute_source_operation(operation))
                        forkjoin_tasks.append(relationship.execute_target_operation(operation))
                    operation_task = forkjoin(*forkjoin_tasks)

                sequence.add(
                    send_event_starting_tasks[instance.id],
                    operation_task,
                    send_event_done_tasks[instance.id])

    for node in ctx.nodes:
        for instance in node.instances:
            for rel in instance.relationships:

                instance_starting_task = send_event_starting_tasks.get(instance.id)
                target_done_task = send_event_done_tasks.get(rel.target_id)

                if instance_starting_task and target_done_task:
                    graph.add_dependency(instance_starting_task, target_done_task)

    return graph.execute()
  

Step explanation:

  • The workflow now has a new parameter - is_node_operation - a boolean which represents whether the operation to execute is a node operation or a relationship operation (line 5).
  • We had a tiny bit of refactoring done: If the operation is a node operation, we create the task somewhat earlier and store it in a variable named operation_task, which is later inserted into the graph in the sequence as before (lines 24-25 + 35)
  • If the operation is a relationship operation, we first collect all tasks related to the current instance that should be executed, by going over all of the instance relationships and creating tasks for both source and target operations (lines 26-30)
  • Finally, We create a single forkjoin task which contains all of the tasks we’ve collected, and store it in the operation_task variable so it’ll later be inserted into the sequence. This will allow all of the relationship operations we’ve collected to run in parallel, while making sure none of them will run before the first sequence task (the send_event about starting the operation) completes and also ensuring they’ll all complete before the last sequence task (the send_event about finishing the operation) is started (line 31).

We could continue improving our workflow and extending its features, but in the scope of this tutorial, this last version of the workflow will be the one we’ll be using throughout the remaining tutorial sections.

Blueprint Mappings

The workflow plugin declaration will look like this:

  
plugins:
  my_workflow_plugin_name:
    executor: central_deployment_agent
    source: http://example.com/url/to/plugin.zip
  

The workflow mapping may look like so:

  
workflows:
  my_workflow:
    mapping: my_workflow_plugin_name.my_workflow_module_name.run_operation
    parameters:
      operation:
        description: the operation to execute
      type_name:
        description: the base type for filtering nodes
        default: cloudify.nodes.Root
      operation_kwargs:
        description: the operation kwargs
        default: {}
      is_node_operation:
        description: >
          is the operation a node operation or
          a relationship operation otherwise
        default: true
  

This will define a workflow named my_workflow, whose implementation is the run_operation workflow method we coded.

The workflow has four parameters declared:

  • The mandatory operation parameter
  • The optional type_name parameter, which defaults to cloudify.nodes.Root (meaning the operation will run on all nodes if this value isn’t overridden)
  • The optional operation_kwargs parameter, which defaults to an empty dictionary.
  • The optional is_node_operation parameter, which defaults to true.

Packaging the Workflow

Since workflows are joined to the blueprint the same way plugins do, they are also packaged the same way. Refer to the Plugin creation guide for more information.

Advanced Usage

What follows are code snippets showing some advanced API that is exposed by the workflow framework.

Task Handlers

Task handlers are callbacks you set on tasks. They get called when a task fails or succeeds.

  
from cloudify.decorators import workflow
from cloudify.workflows import ctx
from cloudify.workflows import tasks

@workflow
def use_task_handlers(**kwargs):

    graph = ctx.graph_mode()
    node = ctx.get_node('some_node')
    instance = next(node.instances)

    def on_success(task):
        instance.logger.info('Task {0} succeeded!'.format(task.id))
        # HandlerResult.cont() is the default for on_success.
        # If a task handler is defined for a task, it must return
        # a HandlerResult instance
        return tasks.HandlerResult.cont()

    def on_failure(task):
        instance.logger.info('Task {0} failed :('.format(task.id))
        # Handler result may override the default behavior.
        # If for example the task was to be retried,
        # this will cause the framework to ignore the failure
        # and move on. (the default in this case is HandlerResult.fail())
        return tasks.HandlerResult.ignore()

    task = instance.execute_operation('my_interface.my_task')
    task.on_success = on_success
    task.on_failure = on_failure

    graph.add_task(task)

    return graph.execute()
  

Deployment Modification

Deployment modification changes the data model to add or remove node instances, and returns the modified node instances for the workflow to operate on them. The built-in scale workflow makes use of this API to scale a node instance up or down.

  
from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def use_modify(**kwargs):
    new_number_of_instances = 12

    node_id = 'webserver_vm'
    node = ctx.get_node(node_id)
    if node.number_of_instances == new_number_of_instances:
        # no change is required
        return

    modification = ctx.deployment.start_modification({
        node.id: {
            'instances': number_of_new_instances
        }
    })

    going_up = node.number_of_instances < new_number_of_instances
    try:
        if going_up:
            # added.node_instances returns all node instances that are
            # affected by the increasing a node's number of instances.
            # Some are newly added and have their
            # instance.modification == 'added'.
            # Others are node instances that have new relationships
            # to the added node instances.
            added_and_related = modification.added.node_instances

            for instance in added_and_related:
                if instance.modification == 'added':
                    # do stuff
                    pass
                else:
                    # do other stuff
                    pass
        else:
            # removed.node_instances returns all node instances that are
            # affected by the decreasing a node's number of instances.
            # Some are removed and have their
            # instance.modification == 'removed'.
            # Others are node instances that will have relationships
            # to the removed node instances removed after calling
            # modification.finish().
            for instance in removed_and_related:
                if instance.modification == 'removed':
                    # do stuff
                    pass
                else:
                    # do other stuff
                    pass
    except:
        # Do stuff to restore the logical state and then
        # call this to restore that storage state
        modification.rollback()
        raise
    else:
        modification.finish()
  

Subgraphs (Experimental)

Subgraphs provide means for easier modeling of complex workflows, by grouping certain operations into their own subgraphs and creating dependencies between different subgraphs. Subgraphs expose the Task and Graph API’s, i.e. they have methods on them to create dependencies between tasks (Graph API) and dependencies can be created between them and other tasks (which may be subgraphs themselves) (Task API).

Note

The Subgraph API is still in its early stage and it may change in backward incompatible ways in the future.

  
from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def use_subgraphs(**kwargs):

    graph = ctx.graph_mode()
    node = ctx.get_node('some_node')
    instance = next(node.instances)

    # A subgraph to create some node instance
    # creating a subgraph also adds it as a task to the graph from
    # which it was created.
    start_subgraph = graph.subgraph('some_start_subgraph')
    start_subgraph_sequence = start_subgraph.sequence()
    start_subgraph_sequence.add([
        instance.execute_opeartion('my_interface.create'),
        instance.execute_opeartion('my_interface.configure'),
        instance.execute_opeartion('my_interface.start')
    ])

    # A subgraph to upgrade some node instance
    upgrade_subgraph = graph.subgraph('some_upgrade_subgraph')
    upgrade_subgraph_sequence = upgrade_subgraph.sequence()
    upgrade_subgraph_sequence.add([
        instance.execute_opeartion('my_interface.upgrade.step1'),
        instance.execute_opeartion('my_interface.upgrade.step2'),
        instance.execute_opeartion('my_interface.upgrade.step3')
    ])

    # Start running operations on the upgrade subgraph
    # only when the start subgraph ended
    graph.add_dependency(upgrade_subgraph, start_subgraph)

    return graph.execute()
  

Contained Subgraph

Get all node instances that are contained in a node instance. The built-in heal workflow makes use of this API to calculate all node instances that belong to a cloudify.nodes.Compute node that should be healed.

  
from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def use_contained_subgraph(**kwargs):
    node = ctx.get_node('some_node')
    instance = next(node.instances)
    # get node instances that are directly contained in the instance
    for contained_instance in instance.contained_instances:
        # do something
        pass

    # get node instances that are recursively contained in the instance
    # (including the instance itself)
    for contained_instance in instance.get_contained_subgraph():
        # do something
        passs