Writing Your Own Plugin

To illustrate how to write a plugin, this topic demonstrates how to create a plugin that is used to start a simple HTTP Web server using Python.

Creating A Plugin Project

Cloudify plugin projects are standard Python projects.

Each Cloudify plugin requires cloudify-common as a dependency, because it contains the necessary APIs for interacting with Cloudify.

cloudify-common documentation is located here.

Tip

You can use the Plugin Template to setup the repository for your plugin.

Setting Up the setup.py File for the Plugin

For example:

from setuptools import setup

setup(
    name='python-http-webserver-plugin',
    version='1.0',
    author='Cloudify',
    packages=['python_webserver'],
    install_requires=['cloudify-common>=4.5.5'],
)
Best Practice

With the exception of cloudify-common, it is strongly recommended that all third-party dependencies (specified in the install_requires parameter) are version-pinned, rather than using a version range. That will guarantee that the plugin is always set up using the very same dependencies, thus avoiding cases of plugin code breaking due to incompatible upstream changes.

Alternatively, you could provide version ranges for dependencies, and pin them down during the Wagon creation process. Refer to the “Creating Wagons” document for more information.

Writing Plugin Operations

Plugin operations are standard Python methods.

For the purpose of demonstrating how to create a plugin, creation of the start and stop operations for a Python HTTP webserver plugin are described.

The start operation will create an index.html file and then start a webserver using the following shell command: python -m SimpleHTTPServer which starts an HTTP server listening on port 8000.

The start & stop operations are placed in a tasks.py module in the python_webserver package in the project.

In the following example, the Cloudify logger, which is accessible using the ctx.logger object, is used.

python_webserver/tasks.py

import os

from cloudify.decorators import operation

@operation
def start(ctx, **kwargs):
    with open('/tmp/index.html', 'w') as f:
        f.write('<p>Hello Cloudify!</p>')

    command = 'cd /tmp; nohup python -m SimpleHTTPServer > /dev/null 2>&1' \
              ' & echo $! > /tmp/python-webserver.pid'

    # use the ctx.logger object to send a formatted log with context
    # to the Manager. The displayed message is only part of the
    # log sent. A lot of context is supplied with the object.
    ctx.logger.info('Starting HTTP server using: {0}'.format(command))
    os.system(command)


@operation
def stop(ctx, **kwargs):
    try:
        with open('/tmp/python-webserver.pid', 'r') as f:
            pid = f.read().strip()
        ctx.logger.info('Stopping HTTP server [pid={0}]'.format(pid))
        os.system('kill -9 {0}'.format(pid))
    except IOError:
        ctx.logger.info('HTTP server is not running!')

Making operations resumable

If a workflow is interrupted (due to a Manager failure, eg. a power loss, or a task failure, or user cancel request) and then resumed, agent operations will not be interrupted - the Manager will continue waiting for them to finish. Therefore, nothing needs to be done for agent operations to make them resumable after a manager failure. Management worker operations however will be retried, provided they are declared resumable. This declaration is done using the operation decorator: @operation(resumable=True).

For a management worker operation to be safe for retrying, it must be made idempotent. There is no generic way to write resumable management worker operations, however useful guidelines include: - use runtime properties to store intermittent state - keep operation functions short and doing one thing only - make sure runtime properties writes are persisted to storage using ctx.instance.update() - avoid keeping state in memory without backing it to persistent storage - before doing OS-level operations, check if they have already been done

Example of resumable operations

@operation(resumable=True)
def operation(ctx):
    # increase only if we haven't already increased the value
    if not ctx.instance.runtime_properties.get('value_written'):
        ctx.instance.runtime_properties['value'] += 1
        ctx.instance.runtime_properties['value_written'] True
        ctx.instance.update()

    # avoid calling the external command if a previous run of this operation
    # have already done so
    if not ctx.instance.runtime_properties.get('data'):
        ctx.instance.runtime_properties['data'] = subprocess.check_output(
            ['external_command'])
        ctx.instance.update()

    # file write - idempotent operation
    with open('/tmp/hello.txt', 'w') as f:
        f.write(ctx.instance.runtime_properties.get('data'))


# compare this to the following operation which cannot be safely resumed
@operation
def operation_nonresumable(ctx):
    # non-guarded increment - if the operation restarts after this, the value
    # would have been increased twice
    ctx.instance.runtime_properties['value'] += 1
    ctx.instance.update()

    # if this function was retried, the external command would run again
    ctx.instance.runtime_properties['data'] = subprocess.check_output(
            ['external_command'])

    # opening with 'a' - append is not idempotent - it might have already
    # been written by a previous run
    with open('/tmp/hello.txt', 'a') as f:
        f.write(ctx.instance.runtime_properties.get('data'))

Retrieving Node Properties

During the previous step, an HTTP webserver, which is now listening on port 8000, was started. If the port was specified in the blueprint, to use that port, the ctx object that represents the context of the invocation exposes the node’s properties, if the plugin’s operation was invoked in the context of a node.

The port property can be retrieved using the following code:

webserver_port = ctx.node.properties['port']

The updated start operation looks as follows:

@operation
def start(ctx, **kwargs):
    # retrieve the port from the node's properties
    webserver_port = ctx.node.properties['port']

    with open('/tmp/index.html', 'w') as f:
        f.write('<p>Hello Cloudify!</p>')

    # use the port that was withdrawn previously when running the Web server
    command = 'cd /tmp; nohup python -m SimpleHTTPServer {0} > /dev/null 2>&1' \
              ' & echo $! > /tmp/python-webserver.pid'.format(webserver_port)

    ctx.logger.info('Starting HTTP server using: {0}'.format(command))
    os.system(command)

Updating and Retrieving Runtime Properties

Runtime properties are properties that are set during runtime and are relevant to node instances. In the example, instead of having the Webserver root set to /tmp a temporary folder is created and its path is stored as a runtime property so that the stop operation reads it when stopping the Webserver.

import os
import tempfile

from cloudify.decorators import operation

@operation
def start(ctx, **kwargs):
    webserver_root = tempfile.mkdtemp()
    # a property, which is set during runtime, is added to the runtime
    # properties of that specific node instance
    ctx.instance.runtime_properties['webserver_root'] = webserver_root

    webserver_port = ctx.node.properties['port']

    with open(os.path.join(webserver_root, 'index.html'), 'w') as f:
        f.write('<p>Hello Cloudify!</p>')

    command = 'cd {0}; nohup python -m SimpleHTTPServer {1} > /dev/null 2>&1' \
              ' & echo $! > python-webserver.pid'.format(webserver_root, webserver_port)

    ctx.logger.info('Starting HTTP server using: {0}'.format(command))
    os.system(command)


@operation
def stop(ctx, **kwargs):
    # setting this runtime property enabled properties to be referred to that
    # are set during runtime from a different time in the node instance's lifecycle
    webserver_root = ctx.instance.runtime_properties['webserver_root']
    try:
        with open(os.path.join(webserver_root, 'python-webserver.pid'), 'r') as f:
            pid = f.read().strip()
        ctx.logger.info('Stopping HTTP server [pid={0}]'.format(pid))
        os.system('kill -9 {0}'.format(pid))
    except IOError:
        ctx.logger.info('HTTP server is not running!')

Runtime properties are saved in Cloudify storage after the plugin’s operation invocation is complete.

Where it is important to immediately save runtime properties to Cloudify storage, call the ctx.update method.

For example:

ctx.instance.runtime_properties['prop1'] = 'This should be updated immediately!'
ctx.instance.update()

Asynchronous Operations

In many situations, such as creating resources in a cloud environment, an operation might be waiting for an asynchronous activity to end (for example, waiting for a VM to start). Instead of implementing a wait-for mechanism in the operation that will wait until the asynchronous activity is over (which blocks the worker process that executes the operation from executing other operations in the meantime), operations can request to be retried after a specific length time to check whether the asynchronous activity has finished.

Requesting A Retry

from cloudify.decorators import operation
from cloudify import exceptions

@operation
def start(ctx, **kwargs):
    # start is executed for the first time, start the resource
    if ctx.operation.retry_number == 0:
        iaas.start_vm()

        # It will take some time until the VM will be running..
        # Request a retry after 30 seconds
        return ctx.operation.retry(message='Waiting for the VM to start..',
                                   retry_after=30)

    # This is a retried operation, check if the resource is running
    # and if not, request another retry
    if iaas.get_vm_state(...) != 'running':

        # Request a retry after 5 seconds
        return ctx.operation.retry(message='Still waiting for the VM to start..',
                                   retry_after=5)

    # Resource is up and running
    ctx.logger.info('VM started successfully!')
Tip

ctx.operation.max_retries can be configured in the Cloudify Manager blueprint. Additional information is located in the Workflows section.

Handling Errors

The Cloudify workflows framework distinguishes between two types of error:

In the current start operation, there is no verification that the Webserver was actually started and is listening on the specified port.

In this step, a verify_server_is_up method is implemented that generates a non-recoverable error if the server was not started within a reasonable period of time:

import os
import tempfile
import urllib2
import time


# import the NonRecoverableError class
from cloudify.exceptions import NonRecoverableError
from cloudify.decorators import operation


def verify_server_is_up(port):
    for attempt in range(15):
        try:
            response = urllib2.urlopen("http://localhost:{0}".format(port))
            response.read()
            break
        except BaseException:
            time.sleep(1)
    else:
        raise NonRecoverableError("Failed to start HTTP webserver")


@operation
def start(ctx, **kwargs):
    webserver_root = tempfile.mkdtemp()
    ctx.instance.runtime_properties['webserver_root'] = webserver_root

    webserver_port = ctx.node.properties['port']

    with open(os.path.join(webserver_root, 'index.html'), 'w') as f:
        f.write('<p>Hello Cloudify!</p>')

    command = 'cd {0}; nohup python -m SimpleHTTPServer {1} > /dev/null 2>&1' \
              ' & echo $! > python-webserver.pid'.format(webserver_root, webserver_port)

    ctx.logger.info('Starting HTTP server using: {0}'.format(command))
    os.system(command)

    # verify
    verify_server_is_up(webserver_port)

Error Details

In some cases, you might want to explicitly raise a Cloudify error in response to some other exception that was raised in your operation code. That is simple to achieve as shown in the previous example. However, if you also want to preserve the original exception details in addition to the exception you raised, you can use the causes keyword argument when raising a RecoverableError or NonRecoverableError. This is demonstrated in the following example (which is based on the previous example).

import urllib2
import time
import sys

from cloudify.utils import exception_to_error_cause
from cloudify.exceptions import NonRecoverableError


def verify_server_is_up(port):
    for attempt in range(15):
        try:
            response = urllib2.urlopen("http://localhost:{0}".format(port))
            response.read()
            break
        except BaseException:
            _, last_ex, last_tb = sys.exc_info()
            time.sleep(1)
    else:
        raise NonRecoverableError(
            "Failed to start HTTP webserver",
            causes=[exception_to_error_cause(last_ex, last_tb)])

Plugin Metadata

Several attributes under ctx.plugin can be used to access details about the plugin involved in the current operation.

Testing Your Plugin

In most cases, the recommendation is to test your plugin’s logic using local workflows, and only then run them as part of a Cloudify deployment. We have supplied you with a nice and tidy decorator to do just that. The cloudify-common’s test_utils package enables you to do that. It is intuitive to use, and an example is provided below:

from cloudify.test_utils import workflow_test

@workflow_test(blueprint_path,
               copy_plugin_yaml,
               resources_to_copy,
               temp_dir_prefix,
               init_args,
               inputs,
               input_func_args,
               input_func_kwargs
               )
def test_my_task(self, cfy_local):
    pass

Workflow Test Arguments

The decorator sets up the environment for the test, and injects the environment as the first argument to the function. For example, if it is called cfy_local. You could run executions via cfy_local.execute('install'), or access storage via cfy_local.storage.

Passing Inputs

Passing inputs is not confined to static inputs:

Context Manager

The decorator functionality also exists as a context manager. However, the following features will not work:

Unit Testing

To unit test a specific function that needs a ctx object, you can use cloudify.mocks.MockCloudifyContext which is provided by cloudify-common.

Example: Using MockCloudifyContext

Assuming the plugin code is located in my_plugin.py:

from cloudify.decorators import operation

@operation
def my_operation(ctx, **kwargs):
    prop1 = ctx.node.properties['node_property_1']
    ctx.logger.info('node_property_1={0}'.format(prop1))

Then use the following code to call the my_operation operation using a mock context object:

from cloudify.mocks import MockCloudifyContext
import my_plugin

props = {'node_property_1': 'value_1'}

mock_ctx = MockCloudifyContext(node_id='test_node_id',
                               node_name='test_node_name',
                               properties=props)

my_plugin.my_operation(mock_ctx)

(Note: MockCloudifyContext accepts various additional parameters. Check the documentation for more information).

Example: Using MockCloudifyContext as a threadlocal

Certain plugins, written for older versions of Cloudify, rely on the ctx object being available as a Python threadlocal. For example:

from cloudify import ctx
from cloudify.decorators import operation

@operation
def my_operation(**kwargs):
    prop1 = ctx.node.properties['node_property_1']
    ctx.logger.info('node_property_1={0}'.format(prop1))

In such cases, you can use the following code to test:

from cloudify.mocks import MockCloudifyContext
from cloudify.state import current_ctx
import my_plugin

props = {'node_property_1': 'value_1'}

mock_ctx = MockCloudifyContext(node_id='test_node_id',
                               node_name='test_node_name',
                               properties=props)

try:
    current_ctx.set(mock_ctx)
    my_plugin.my_operation()
finally:
    current_ctx.clear()

Now that the plugin is created, you need to incorporate it in your blueprint. For more information, see the Plugins specification.

Best Practices

Overall Structure

Interface First

The most important part of designing a plugin, is designing its TOSCA “view”. Even the most comprehensible plugin is almost entirely useless if users can’t make proper use of it within blueprints. Therefore, the first and foremost item to focus on should be the node types that are involved. The rationale:

Layered Approach

We propose the following layered approach for designing and implementing a Cloudify plugin:

The Third-Party SDK Layer

This layer only applies for cases in which there exists a third-party Python-based API to the system we’re interacting with. Examples: * The OpenStack plugin (using the official OpenStack API libraries for Python) * The AWS-SDK plugin (using boto3) * The GCP plugin (using the official Python-based GCP API)

This layer is not a part of the plugin’s codebase; instead, it is declared as a set of dependencies in the plugin’s setup.py file.

The Context-Independent Layer

Here comes the implementation of the plugin’s functionality, optionally using third-party SDK’s. The most important design principle here is context independence, which means that the code makes no assumptions about the context in which it is being run. As a consequence:

The rationale behind this principle is that we want to be able to use this code from anywhere, not only within a Cloudify operation or workflow, thus:

This layer should be designed with reuse and extensibility in mind.

The Cloudify Integration Layer

This should be the simplest layer in the plugin. A good indication of a well-designed plugin is how small this layer is: the more “responsibility” included in this layer, the more likely it is that the design of the context-independent layer could be improved.

In this layer, ideally, we would only have the Cloudify operation functions, doing minimum amount of work and delegating to the lower layer for processing, and then properly handling return values as well as exceptions.

Referring to ctx

The ctx object is available to operations in two methods:

In previous versions of Cloudify, developers were instructed to follow the threadlocal approach:

from cloudify import ctx
from cloudify.decorators import operation

...

@operation
def my_operation(input1, input2, **kwargs):
  ctx.logger.info('Hello')

While this approach is straightforward when it comes to developing operations, it is cumbersome when considering writing unit tests. That’s because the ctx object needs to be placed as a threadlocal on the current thread and cleaned-up afterwards. In general, code using threadlocal variables is generally harder, rather than easier, to call.

The preferred approach is to avoid importing ctx altogether and instead provide ctx as a keyword argument:

@operation
def my_operation(ctx, input1, input2, **kwargs):
  ctx.logger.info('Hello')

Downloading Resources using ctx.download_resource

The download_resource function may optionally receive a target_path argument. If it is not specified, the resource is downloaded into a new temporary directory, by preserving the original resource’s base name.

For example, the following code:

ctx.download_resource('resources/hello.html')

— will result in a random directory created inside the operating system’s temporary directory, and the file hello.html downloaded into it (for example: /tmp/tmp123456/hello.html).

In that case, it is important to remember to not only delete the temporary resource once you’re done with it, but to also delete its parent directory (/tmp/tmp123456 in the example above).

A preferred approach is to provide the target_path argument, and properly dispose of the resource when it’s not needed anymore. For example:

import tempfile
import os

...
...

with tempfile.NamedTemporaryFile(delete=False) as f:
  f.close()
  ctx.download_resource('resources/hello.html', target_path=f.name)
  ...
  ...

os.remove(f.name)

Supplementary Information

The Context Object

The ctx context object contains contextual parameters that are mirrored from the blueprint, alongside additional functionality:

Properties Context Objects

Utility Context Objects

Cloud Plugins

The lifecycle start operation should store the following runtime properties for the cloudify.nodes.Compute node instance:

See the Cloudify OpenStack plugin for reference.