Creating Custom Workflows

Introduction to Implementing Workflows

Workflows implementation shares several similarities with plugins implementation:

Example: A typical workflow method’s signature

from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def my_workflow(**kwargs):
    pass

Workflow parameters are received by the method as named parameters, and so if a workflow uses parameters they should usually appear in the method signature as well (or they’ll end up under kwargs).

Tip

It’s recommended not to have default values for parameters in the workflow method’s signature. Instead, the default values should be provided in the workflow’s parameters declaration in the blueprint - That way, the parameter and its default value are visible to the user both in the blueprint and via CLI commands, and the user is also able to override the default value by simply editing the blueprint, without modifying code. For more information on workflow parameters declaration, refer to the Blueprint mapping section.

There are two approaches to implementing workflows:

APIs

The ctx object used by workflow methods is of type CloudifyWorkflowContext. It offers access to context data and various services. Additionally, any node, node instance, relationship or relationship instance objects returned by the context object (or from objects returned by the context object) will be wrapped in types which offer additional context data and services.

For full API reference, refer to the documentation over at cloudify-plugins-common.readthedocs.org.

Blueprint Mapping

As is the case with plugins, it’s the workflow author’s responsilibity to write a yaml file (named plugin.yaml by convention) which will contain both the workflow mapping as well as the workflow’s plugin declaration.

Mapping a workflow name to a workflow implementation in the blueprint is done in a similar fashion to the way plugin operations are mapped, i.e. in one of two ways:

Example: Mapping my_workflow to be implemented by the my_workflow_method_name method in the my_workflow_module_name module in the my_workflow_plugin_name plugin

workflows:
  my_workflow: my_workflow_plugin_name.my_workflow_module_name.my_workflow_method_name
  

Workflow parameters declaration is done in a similar manner to the way type properties are declared: It’s structured as a schema map, where each entry specifies the parameter schema.

Example: Making the same mapping yet with parameters declaration - A single mandatory parameter named mandatory_parameter, and two optional parameters with default values (one of which is a complex value)*

workflows:
  my_workflow:
    mapping: my_workflow_plugin_name.my_workflow_module_name.my_workflow_method_name
    parameters:
      mandatory_parameter:
        description: this parameter is mandatory
      optional_parameter:
        description: this paramters is optional
        default: optional_parameter_default_value
      nested_parameter:
        description: >
          this parameter is also optional,
          it's default value has nested values.
        default:
          key1: value1
          key2: value2
  


The workflows implementations are considered as workflows plugins. As such, they are joined to the blueprint using the exact same mapping that’s used to join regular plugins, e.g.:

plugins:
  my_workflow_plugin_name:
    executor: central_deployment_agent
    source: http://example.com/url/to/plugin.zip

Cancellation Support

A workflow should have support for graceful (AKA Standard) cancellation. It is up to the workflow author to decide the semantics of graceful in this regard (and document them properly) - One workflow may merely stop the execution, while another may perform a rollback, and so on.

Standard workflows which don’t implement such support will simply ignore the cancellation request and continue executing the workflow. To implement cancellation support for standard workflows, some constructs from the cloudify.workflows.workflows_api module need to be used.

Importing the module is done as so:

from cloudify.workflows import api

Then, api.has_cancel_request() can be used to determine whether the workflow execution should be cancelled due to a standard cancellation request. If it returns True, the workflow should take whichever actions its author deems as a proper graceful cancellation, and then raise an api.ExecutionCancelled error.

Graph-based workflows have inherent support for graceful cancellation. Upon receiving such a request once the graph’s execute method has been called, the defined behavior is for the workflow execution to simply end - yet any tasks related to the execution which might have been running at the moment of cancellation will continue to run until they’re over.

Once the graph execution ends, the tasks_graph’s method execute will raise the api.ExecutionCancelled error.

For both types of workflows, it’s of course possible to catch api.ExecutionCancelled errors that may have been raised, thus allowing to perform any sort of cleanup or custom behavior before re-raising the error.

Deprecation Notice

The api.EXECUTION_CANCELLED_RESULT value, which may have been returned from a workflow to signal that it has cancelled sucessfully, is now deprecated. Raise the api.ExecutionCancelled error instead to indicate such an event.

Backwards Compatibility Notice

The Graph API will now raise an api.ExecutionCancelled error instead of returning the deprecated api.EXECUTION_CANCELLED_RESULT in the event of an execution cancellation. This means that any workflows which made any additional operations beyond the call to the graph’s execute method, should now use a try-finally clause to be able to perform these additional operations and still raise the approriate error once they’re done.

Resuming Support

Resuming workflows is a way to continue execution after the execution has failed or has been cancelled, or after a Cloudify Manager failure (loss of power, or any other scenario leading to an ungraceful shutdown of the management worker).

Resuming a workflow essentially means running it again, so the workflow author must make sure the workflow is able to cope with being re-run. This is the easiest to achieve when organizing the workflow in terms of a tasks graph. The tasks graphs may be stored and restored while keeping information about the state of each operation in the graph.

For convenience, Cloudify provides the make_or_get_graph function, which will take care of restoring a tasks graph during a resume, or will otherwise call a function which should create a new tasks graph. This function also requires a name parameter, so that if the workflow creates multiple tasks graphs, they can be distinguished.

Example: Creating a resumable workflow using a tasks graph

from cloudify.decorators import workflow
from cloudify.workflows.tasks_graph import make_or_get_graph

@workflow(resumable=True)  # declare that this workflow can be resumed
def my_workflow(ctx, parameter=None):
    graph = _my_workflow_graph(ctx, name='workflow', parameter=parameter)
    graph.execute()

@make_or_get_graph
def _my_workflow_graph(ctx, parameter):
    graph = ctx.graph_mode()
    graph.add(some_operation)
    return graph

Note that the workflow must declare that it is resumable by passing the resumable=True keyword to the workflow decorator. If not declared, the workflow will fail when a resume is attempted.

Workflows that do not use the tasks graph can be resumed as well, but the workflow author must implement themselves what should happen when a resume is attempted. The resume workflow context attribute (ctx.resume) says if the workflow is currently being resumed (True) or if the workflow is executed for the first time (False).

Step by Step Tutorial

In this tutorial we will create from scratch a custom graph-based workflow whose purpose is to execute plugin operations.

The tutorial will offer some guidance and reference about the following:

Requirements

Similarly to plugins, workflows require the cloudify-plugins-common package to be able to use the Cloudify workflows API and framework.

Implementing the Workflow

We’ll be implementing the workflow one step at a time, where in each step we’ll have a valid, working workflow, but with more features than the one in the previous step.

Expand me...
Expand me...
Expand me...
Expand me...
Expand me...

We could continue improving our workflow and extending its features, but in the scope of this tutorial, this last version of the workflow will be the one we’ll be using throughout the remaining tutorial sections.

Blueprint Mappings

The workflow plugin declaration will look like this:

plugins:
  my_workflow_plugin_name:
    executor: central_deployment_agent
    source: http://example.com/url/to/plugin.zip

The workflow mapping may look like so:

workflows:
  my_workflow:
    mapping: my_workflow_plugin_name.my_workflow_module_name.run_operation
    parameters:
      operation:
        description: the operation to execute
      type_name:
        description: the base type for filtering nodes
        default: cloudify.nodes.Root
      operation_kwargs:
        description: the operation kwargs
        default: {}
      is_node_operation:
        description: >
          is the operation a node operation or
          a relationship operation otherwise
        default: true

This will define a workflow named my_workflow, whose implementation is the run_operation workflow method we coded.

The workflow has four parameters declared:

Packaging the Workflow

Since workflows are joined to the blueprint the same way plugins do, they are also packaged the same way. Refer to the Plugin creation guide for more information.

Advanced Usage

What follows are code snippets showing some advanced API that is exposed by the workflow framework.

Task Handlers

Task handlers are callbacks you set on tasks. They get called when a task fails or succeeds.

from cloudify.decorators import workflow
from cloudify.workflows import ctx
from cloudify.workflows import tasks

@workflow
def use_task_handlers(**kwargs):

    graph = ctx.graph_mode()
    node = ctx.get_node('some_node')
    instance = next(node.instances)

    def on_success(task):
        instance.logger.info('Task {0} succeeded!'.format(task.id))
        # HandlerResult.cont() is the default for on_success.
        # If a task handler is defined for a task, it must return
        # a HandlerResult instance
        return tasks.HandlerResult.cont()

    def on_failure(task):
        instance.logger.info('Task {0} failed :('.format(task.id))
        # Handler result may override the default behavior.
        # If for example the task was to be retried,
        # this will cause the framework to ignore the failure
        # and move on. (the default in this case is HandlerResult.fail())
        return tasks.HandlerResult.ignore()

    task = instance.execute_operation('my_interface.my_task')
    task.on_success = on_success
    task.on_failure = on_failure

    graph.add_task(task)

    return graph.execute()

Deployment Modification

Deployment modification changes the data model to add or remove node instances, and returns the modified node instances for the workflow to operate on them. The built-in scale workflow makes use of this API to scale a node instance up or down.

from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def use_modify(**kwargs):
    new_number_of_instances = 12

    node_id = 'webserver_vm'
    node = ctx.get_node(node_id)
    if node.number_of_instances == new_number_of_instances:
        # no change is required
        return

    modification = ctx.deployment.start_modification({
        node.id: {
            'instances': number_of_new_instances
        }
    })

    going_up = node.number_of_instances < new_number_of_instances
    try:
        if going_up:
            # added.node_instances returns all node instances that are
            # affected by the increasing a node's number of instances.
            # Some are newly added and have their
            # instance.modification == 'added'.
            # Others are node instances that have new relationships
            # to the added node instances.
            added_and_related = modification.added.node_instances

            for instance in added_and_related:
                if instance.modification == 'added':
                    # do stuff
                    pass
                else:
                    # do other stuff
                    pass
        else:
            # removed.node_instances returns all node instances that are
            # affected by the decreasing a node's number of instances.
            # Some are removed and have their
            # instance.modification == 'removed'.
            # Others are node instances that will have relationships
            # to the removed node instances removed after calling
            # modification.finish().
            for instance in removed_and_related:
                if instance.modification == 'removed':
                    # do stuff
                    pass
                else:
                    # do other stuff
                    pass
    except:
        # Do stuff to restore the logical state and then
        # call this to restore that storage state
        modification.rollback()
        raise
    else:
        modification.finish()

Subgraphs (Experimental)

Subgraphs provide means for easier modeling of complex workflows, by grouping certain operations into their own subgraphs and creating dependencies between different subgraphs. Subgraphs expose the Task and Graph API’s, i.e. they have methods on them to create dependencies between tasks (Graph API) and dependencies can be created between them and other tasks (which may be subgraphs themselves) (Task API).

from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def use_subgraphs(**kwargs):

    graph = ctx.graph_mode()
    node = ctx.get_node('some_node')
    instance = next(node.instances)

    # A subgraph to create some node instance
    # creating a subgraph also adds it as a task to the graph from
    # which it was created.
    start_subgraph = graph.subgraph('some_start_subgraph')
    start_subgraph_sequence = start_subgraph.sequence()
    start_subgraph_sequence.add([
        instance.execute_opeartion('my_interface.create'),
        instance.execute_opeartion('my_interface.configure'),
        instance.execute_opeartion('my_interface.start')
    ])

    # A subgraph to upgrade some node instance
    upgrade_subgraph = graph.subgraph('some_upgrade_subgraph')
    upgrade_subgraph_sequence = upgrade_subgraph.sequence()
    upgrade_subgraph_sequence.add([
        instance.execute_opeartion('my_interface.upgrade.step1'),
        instance.execute_opeartion('my_interface.upgrade.step2'),
        instance.execute_opeartion('my_interface.upgrade.step3')
    ])

    # Start running operations on the upgrade subgraph
    # only when the start subgraph ended
    graph.add_dependency(upgrade_subgraph, start_subgraph)

    return graph.execute()

Contained Subgraph

Get all node instances that are contained in a node instance. The built-in heal workflow makes use of this API to calculate all node instances that belong to a cloudify.nodes.Compute node that should be healed.

from cloudify.decorators import workflow
from cloudify.workflows import ctx

@workflow
def use_contained_subgraph(**kwargs):
    node = ctx.get_node('some_node')
    instance = next(node.instances)
    # get node instances that are directly contained in the instance
    for contained_instance in instance.contained_instances:
        # do something
        pass

    # get node instances that are recursively contained in the instance
    # (including the instance itself)
    for contained_instance in instance.get_contained_subgraph():
        # do something
        passs