Writing Your Own Plugin

To illustrate how to write a plugin, this topic demonstrates how to create a plugin that is used to start a simple HTTP Web server using Python.

Creating A Plugin Project

Cloudify plugin projects are standard Python projects.

Each Cloudify plugin requires cloudify-plugins-common as a dependency, because it contains the necessary APIs for interacting with Cloudify.

cloudify-plugins-common documentation is located here.

Tip

You can use the Plugin Template to setup the repository for your plugin.

Setting up the setup.py File for the Plugin

For example:

from setuptools import setup

setup(
    name='python-http-webserver-plugin',
    version='1.0',
    author='Cloudify',
    packages=['python_webserver'],
    install_requires=['cloudify-plugins-common>=3.3'],
)

Writing Plugin Operations

Plugin operations are standard Python methods that are decorated with Cloudify’s operation decorator, so that Cloudify can identify them as plugin operations.

For the purpose of demonstrating how to create a plugin, creation of the start and stop operations for a Python HTTP webserver plugin are described.

The start operation will create an index.html file and then start a webserver using the following shell command: python -m SimpleHTTPServer which starts an HTTP server listening on port 8000.

The start & stop operations are placed in a tasks.py module in the python_webserver package in the project.

In the following example, the Cloudify logger, which is accessible using the ctx.logger object, is used.

python_webserver/tasks.py

import os

# import the ctx object
from cloudify import ctx

# import the operation decorator
from cloudify.decorators import operation

@operation
def start(**kwargs):
    with open('/tmp/index.html', 'w') as f:
        f.write('<p>Hello Cloudify!</p>')

    command = 'cd /tmp; nohup python -m SimpleHTTPServer > /dev/null 2>&1' \
              ' & echo $! > /tmp/python-webserver.pid'

    # use the ctx.logger object to send a formatted log with context
    # to the Manager. The displayed message is only part of the
    # log sent. A lot of context is supplied with the object.
    ctx.logger.info('Starting HTTP server using: {0}'.format(command))
    os.system(command)


# multiple operations that can be referred to afterwards
# in the blueprint are defined
@operation
def stop(**kwargs):
    try:
        with open('/tmp/python-webserver.pid', 'r') as f:
            pid = f.read().strip()
        ctx.logger.info('Stopping HTTP server [pid={0}]'.format(pid))
        os.system('kill -9 {0}'.format(pid))
    except IOError:
        ctx.logger.info('HTTP server is not running!')

Retrieving Node Properties

During the previous step, an HTTP webserver, which is now listening on port 8000, was started. If the port was specified in the blueprint, to use that port, the ctx object that represents the context of the invocation exposes the node’s properties, if the plugin’s operation was invoked in the context of a node.

The port property can be retrieved using the following code:

webserver_port = ctx.node.properties['port']

The updated start operation looks as follows:

from cloudify import ctx

@operation
def start(**kwargs):
    # retrieve the port from the node's properties
    webserver_port = ctx.node.properties['port']

    with open('/tmp/index.html', 'w') as f:
        f.write('<p>Hello Cloudify!</p>')

    # use the port that was withdrawn previously when running the Web server
    command = 'cd /tmp; nohup python -m SimpleHTTPServer {0} > /dev/null 2>&1' \
              ' & echo $! > /tmp/python-webserver.pid'.format(webserver_port)

    ctx.logger.info('Starting HTTP server using: {0}'.format(command))
    os.system(command)

Updating and Retrieving Runtime Properties

Runtime properties are properties that are set during runtime and are relevant to node instances. In the example, instead of having the Webserver root set to /tmp a temporary folder is created and its path is stored as a runtime property so that the stop operation reads it when stopping the Webserver.

import os
import tempfile

from cloudify import ctx
from cloudify.decorators import operation


@operation
def start(**kwargs):
    webserver_root = tempfile.mkdtemp()
    # a property, which is set during runtime, is added to the runtime
    # properties of that specific node instance
    ctx.instance.runtime_properties['webserver_root'] = webserver_root

    webserver_port = ctx.node.properties['port']

    with open(os.path.join(webserver_root, 'index.html'), 'w') as f:
        f.write('<p>Hello Cloudify!</p>')

    command = 'cd {0}; nohup python -m SimpleHTTPServer {1} > /dev/null 2>&1' \
              ' & echo $! > python-webserver.pid'.format(webserver_root, webserver_port)

    ctx.logger.info('Starting HTTP server using: {0}'.format(command))
    os.system(command)


@operation
def stop(**kwargs):
    # setting this runtime property enabled properties to be referred to that
    # are set during runtime from a different time in the node instance's lifecycle
    webserver_root = ctx.instance.runtime_properties['webserver_root']
    try:
        with open(os.path.join(webserver_root, 'python-webserver.pid'), 'r') as f:
            pid = f.read().strip()
        ctx.logger.info('Stopping HTTP server [pid={0}]'.format(pid))
        os.system('kill -9 {0}'.format(pid))
    except IOError:
        ctx.logger.info('HTTP server is not running!')

Runtime properties are saved in Cloudify storage after the plugin’s operation invocation is complete. (For which the @operation decorator is responsible).

Where it is important to immediately save runtime properties to Cloudify storage, call the ctx.update method.

For example:

ctx.instance.runtime_properties['prop1'] = 'This should be updated immediately!'
ctx.instance.update()

Asynchronous Operations

In many situations, such as creating resources in a Cloud environment, an operation might be waiting for an asynchronous activity to end (for example, waitng for a VM to start). Instead of implementing a wait-for mechanism in the operation that will wait until the asynchronous activity is over (which blocks the user who executed the operation from executing other operations in the meantime), operations can request to be retried after a specific time and to check whether the asynchronous activity is finished.

Requesting A Retry

from cloudify import ctx
from cloudify.decorators import operation
from cloudify import exceptions

@operation
def start(**kwargs):
    # start is executed for the first time, start the resource
    if ctx.operation.retry_number == 0:
        iaas.start_vm()

        # It will take some time until the VM will be running..
        # Request a retry after 30 seconds
        return ctx.operation.retry(message='Waiting for the VM to start..',
                                   retry_after=30)

    # This is a retried operation, check if the resource is running
    # and if not, request another retry
    if iaas.get_vm_state(...) != 'running':

        # Request a retry after 5 seconds
        return ctx.operation.retry(message='Still waiting for the VM to start..',
                                   retry_after=5)

    # Resource is up and running
    ctx.logger.info('VM started successfully!')
Tip

ctx.operation.max_retries can be configured in the Cloudify Manager blueprint. Additional information is located in the Workflows section.

Handling Errors

The Cloudify workflows framework distinguishes between two types of error:

In the current start operation, there is no verification that the Webserver was actually started and is listening on the specified port.

In this step, a verify_server_is_up method is implemented that generates a non-recoverable error if the server was not started within a reasonable period of time:

import os
import tempfile
import urllib2
import time

from cloudify import ctx
from cloudify.decorators import operation
# import the NonRecoverableError class
from cloudify.exceptions import NonRecoverableError


def verify_server_is_up(port):
    for attempt in range(15):
        try:
            response = urllib2.urlopen("http://localhost:{0}".format(port))
            response.read()
            break
        except BaseException:
            time.sleep(1)
    else:
        raise NonRecoverableError("Failed to start HTTP webserver")


@operation
def start(**kwargs):
    webserver_root = tempfile.mkdtemp()
    ctx.instance.runtime_properties['webserver_root'] = webserver_root

    webserver_port = ctx.node.properties['port']

    with open(os.path.join(webserver_root, 'index.html'), 'w') as f:
        f.write('<p>Hello Cloudify!</p>')

    command = 'cd {0}; nohup python -m SimpleHTTPServer {1} > /dev/null 2>&1' \
              ' & echo $! > python-webserver.pid'.format(webserver_root, webserver_port)

    ctx.logger.info('Starting HTTP server using: {0}'.format(command))
    os.system(command)

    # verify
    verify_server_is_up(webserver_port)

Error Details

When an operation fails due to an exception being generated (intentionally or unintentionally), the exception details are stored in the task_failed/task_reschduled events.

In some cases, you might want to explicitly raise a NonRecoverableError (for example) in response to some other exception that was raised in your operation code. That is quite simple to achieve as shown in the previous example. However, if you also want to preserve the original exception details in addition to the exception you raised, you can use the causes keyword argument when raising a RecoverableError or NonRecoverableError. This is demonstrated in the following example (which is based on the previous example).

import urllib2
import time
import sys

from cloudify.utils import exception_to_error_cause
from cloudify.exceptions import NonRecoverableError


def verify_server_is_up(port):
    for attempt in range(15):
        try:
            response = urllib2.urlopen("http://localhost:{0}".format(port))
            response.read()
            break
        except BaseException:
            _, last_ex, last_tb = sys.exc_info()
            time.sleep(1)
    else:
        raise NonRecoverableError(
            "Failed to start HTTP webserver",
            causes=[exception_to_error_cause(last_ex, last_tb)])

Plugin Metadata

Several attributes under ctx.plugin can be used to access details about the plugin involved in the current operation.

Testing Your Plugin

In most cases, the recommendation is to test your plugin’s logic using local workflows, and only then run them as part of a Cloudify deployment. We have supplied you with a nice and tidy decorator to do just that. The cloudify-plugins-common’s test_utils package enables you to do that. It is intuitive to use, but an example is provided below:

from cloudify.test_utils import workflow_test

@workflow_test(
                blueprint_path,
                copy_plugin_yaml,
                resources_to_copy,
                temp_dir_prefix,
                init_args,
                inputs,
                input_func_args,
                input_func_kwargs
                )
def test_my_task(self, cfy_local):
    pass

Workflow Test Arguments

The decorator sets up the environment for the test, and injects the environment as the first argument to the function. For example, if it is called cfy_local. You could run executions via cfy_local.execute('install'), or access storage via cfy_local.storage.

Passing Inputs

Passing inputs is not confined to static inputs:

Context Manager

The decorator functionality also exists as a context manager. However, the following features will not work:

Unit Testing

To unit test a specific function that needs a ctx object, you can use cloudify.mocks.MockCloudifyContext which is provided by cloudify-plugins-common.

Example: Using MockCloudifyContext

Assuming the plugin code is located in my_plugin.py:

from cloudify import ctx

@operation
def my_operation(**kwargs):
    prop1 = ctx.node.properties['node_property_1']
    ctx.logger.info('node_property_1={0}'.format(prop1))

Then use the following code to call the my_operation operation using a mock context object:

from cloudify.mocks import MockCloudifyContext
from cloudify.state import current_ctx
import my_plugin

props = {'node_property_1': 'value_1'}

mock_ctx = MockCloudifyContext(node_id='test_node_id',
                               node_name='test_node_name',
                               properties=props)

try:
    current_ctx.set(mock_ctx)
    my_plugin.my_operation()
finally:
    current_ctx.clear()

(Note: MockCloudifyContext accepts various additional parameters. Check the documentation for more information).

Now that the plugin is created, you need to incorporate it in your blueprint. For more information, see the Plugins specification.

Supplementary Information

The Context Object

The ctx context object contains contextual parameters that are mirrored from the blueprint, alongside additional functionality:

Properties Context Objects

Utility Context Objects

Cloud Plugins

The lifecycle start operation should store the following runtime properties for the Compute node instance:

See the Cloudify OpenStack plugin for reference.