Writing Your Own Plugin
To illustrate how to write a plugin, this topic demonstrates how to create a plugin that is used to start a simple HTTP Web server using Python.
Creating A Plugin Project
Cloudify plugin projects are standard Python projects.
Each Cloudify plugin requires cloudify-plugins-common
as a dependency, because it contains the necessary APIs for interacting with Cloudify.
cloudify-plugins-common
documentation is located here.
You can use the Plugin Template to setup the repository for your plugin.
Setting up the setup.py File for the Plugin
For example:
from setuptools import setup
setup(
name='python-http-webserver-plugin',
version='1.0',
author='Cloudify',
packages=['python_webserver'],
install_requires=['cloudify-plugins-common>=3.3'],
)
Writing Plugin Operations
Plugin operations are standard Python methods that are decorated with Cloudify’s operation
decorator, so that Cloudify can identify them as plugin operations.
For the purpose of demonstrating how to create a plugin, creation of the start
and stop
operations for a Python HTTP webserver plugin are described.
The start operation will create an index.html
file and then start a webserver using the following shell command: python -m SimpleHTTPServer
which starts an HTTP server listening on port 8000.
The start & stop operations are placed in a tasks.py
module in the python_webserver
package in the project.
In the following example, the Cloudify logger, which is accessible using the ctx.logger
object, is used.
python_webserver/tasks.py
import os
# import the ctx object
from cloudify import ctx
# import the operation decorator
from cloudify.decorators import operation
@operation
def start(**kwargs):
with open('/tmp/index.html', 'w') as f:
f.write('<p>Hello Cloudify!</p>')
command = 'cd /tmp; nohup python -m SimpleHTTPServer > /dev/null 2>&1' \
' & echo $! > /tmp/python-webserver.pid'
# use the ctx.logger object to send a formatted log with context
# to the Manager. The displayed message is only part of the
# log sent. A lot of context is supplied with the object.
ctx.logger.info('Starting HTTP server using: {0}'.format(command))
os.system(command)
# multiple operations that can be referred to afterwards
# in the blueprint are defined
@operation
def stop(**kwargs):
try:
with open('/tmp/python-webserver.pid', 'r') as f:
pid = f.read().strip()
ctx.logger.info('Stopping HTTP server [pid={0}]'.format(pid))
os.system('kill -9 {0}'.format(pid))
except IOError:
ctx.logger.info('HTTP server is not running!')
Retrieving Node Properties
During the previous step, an HTTP webserver, which is now listening on port 8000, was started.
If the port was specified in the blueprint, to use that port, the ctx
object that represents the context of the invocation exposes the node’s properties, if the plugin’s operation was invoked in the context of a node.
The port property can be retrieved using the following code:
webserver_port = ctx.node.properties['port']
The updated start operation looks as follows:
from cloudify import ctx
@operation
def start(**kwargs):
# retrieve the port from the node's properties
webserver_port = ctx.node.properties['port']
with open('/tmp/index.html', 'w') as f:
f.write('<p>Hello Cloudify!</p>')
# use the port that was withdrawn previously when running the Web server
command = 'cd /tmp; nohup python -m SimpleHTTPServer {0} > /dev/null 2>&1' \
' & echo $! > /tmp/python-webserver.pid'.format(webserver_port)
ctx.logger.info('Starting HTTP server using: {0}'.format(command))
os.system(command)
Updating and Retrieving Runtime Properties
Runtime properties are properties that are set during runtime and are relevant to node instances.
In the example, instead of having the Webserver root set to /tmp
a temporary folder is created and its path is stored as a runtime property so that the stop operation reads it when stopping the Webserver.
import os
import tempfile
from cloudify import ctx
from cloudify.decorators import operation
@operation
def start(**kwargs):
webserver_root = tempfile.mkdtemp()
# a property, which is set during runtime, is added to the runtime
# properties of that specific node instance
ctx.instance.runtime_properties['webserver_root'] = webserver_root
webserver_port = ctx.node.properties['port']
with open(os.path.join(webserver_root, 'index.html'), 'w') as f:
f.write('<p>Hello Cloudify!</p>')
command = 'cd {0}; nohup python -m SimpleHTTPServer {1} > /dev/null 2>&1' \
' & echo $! > python-webserver.pid'.format(webserver_root, webserver_port)
ctx.logger.info('Starting HTTP server using: {0}'.format(command))
os.system(command)
@operation
def stop(**kwargs):
# setting this runtime property enabled properties to be referred to that
# are set during runtime from a different time in the node instance's lifecycle
webserver_root = ctx.instance.runtime_properties['webserver_root']
try:
with open(os.path.join(webserver_root, 'python-webserver.pid'), 'r') as f:
pid = f.read().strip()
ctx.logger.info('Stopping HTTP server [pid={0}]'.format(pid))
os.system('kill -9 {0}'.format(pid))
except IOError:
ctx.logger.info('HTTP server is not running!')
Runtime properties are saved in Cloudify storage after the plugin’s operation invocation is complete. (For which the @operation
decorator is responsible).
Where it is important to immediately save runtime properties to Cloudify storage, call the ctx.update
method.
For example:
ctx.instance.runtime_properties['prop1'] = 'This should be updated immediately!'
ctx.instance.update()
Asynchronous Operations
In many situations, such as creating resources in a Cloud environment, an operation might be waiting for an asynchronous activity to end (for example, waitng for a VM to start). Instead of implementing a wait-for mechanism in the operation that will wait until the asynchronous activity is over (which blocks the user who executed the operation from executing other operations in the meantime), operations can request to be retried after a specific time and to check whether the asynchronous activity is finished.
Requesting A Retry
from cloudify import ctx
from cloudify.decorators import operation
from cloudify import exceptions
@operation
def start(**kwargs):
# start is executed for the first time, start the resource
if ctx.operation.retry_number == 0:
iaas.start_vm()
# It will take some time until the VM will be running..
# Request a retry after 30 seconds
return ctx.operation.retry(message='Waiting for the VM to start..',
retry_after=30)
# This is a retried operation, check if the resource is running
# and if not, request another retry
if iaas.get_vm_state(...) != 'running':
# Request a retry after 5 seconds
return ctx.operation.retry(message='Still waiting for the VM to start..',
retry_after=5)
# Resource is up and running
ctx.logger.info('VM started successfully!')
ctx.operation.max_retries
can be configured in the Cloudify Manager blueprint. Additional information is located in the Workflows section.
Handling Errors
The Cloudify workflows framework distinguishes between two types of error:
- Recoverable errors - Cloudify workflows will retry operations that generated such errors, where all Python errors are treated as recoverable errors.
- Non-recoverable errors - Errors that should not be retried and the workflow determines how to handle them.
In the current start operation, there is no verification that the Webserver was actually started and is listening on the specified port.
In this step, a verify_server_is_up
method is implemented that generates a non-recoverable error if the server was not started within a reasonable period of time:
import os
import tempfile
import urllib2
import time
from cloudify import ctx
from cloudify.decorators import operation
# import the NonRecoverableError class
from cloudify.exceptions import NonRecoverableError
def verify_server_is_up(port):
for attempt in range(15):
try:
response = urllib2.urlopen("http://localhost:{0}".format(port))
response.read()
break
except BaseException:
time.sleep(1)
else:
raise NonRecoverableError("Failed to start HTTP webserver")
@operation
def start(**kwargs):
webserver_root = tempfile.mkdtemp()
ctx.instance.runtime_properties['webserver_root'] = webserver_root
webserver_port = ctx.node.properties['port']
with open(os.path.join(webserver_root, 'index.html'), 'w') as f:
f.write('<p>Hello Cloudify!</p>')
command = 'cd {0}; nohup python -m SimpleHTTPServer {1} > /dev/null 2>&1' \
' & echo $! > python-webserver.pid'.format(webserver_root, webserver_port)
ctx.logger.info('Starting HTTP server using: {0}'.format(command))
os.system(command)
# verify
verify_server_is_up(webserver_port)
Error Details
When an operation fails due to an exception being generated (intentionally or unintentionally), the exception details are stored in the task_failed/task_reschduled events.
In some cases, you might want to explicitly raise a NonRecoverableError
(for example) in response to some other exception that was raised
in your operation code. That is quite simple to achieve as shown in the previous example. However, if you also want to preserve the original
exception details in addition to the exception you raised, you can use the causes
keyword argument when raising a RecoverableError
or NonRecoverableError
. This is demonstrated in the following example (which is based on the previous example).
import urllib2
import time
import sys
from cloudify.utils import exception_to_error_cause
from cloudify.exceptions import NonRecoverableError
def verify_server_is_up(port):
for attempt in range(15):
try:
response = urllib2.urlopen("http://localhost:{0}".format(port))
response.read()
break
except BaseException:
_, last_ex, last_tb = sys.exc_info()
time.sleep(1)
else:
raise NonRecoverableError(
"Failed to start HTTP webserver",
causes=[exception_to_error_cause(last_ex, last_tb)])
Plugin Metadata
Several attributes under ctx.plugin
can be used to access details about the plugin involved in the current operation.
ctx.plugin.name
Returns the plugin name, as defined in the application blueprint that imported the involved plugin.ctx.plugin.package_name
andctx.plugin.package_version
Return the package name and package version, as defined in the application blueprint that imported the involved plugin.ctx.plugin.prefix
Returns the prefix in which the plugin is installed. For local workflows,ctx.plugin.prefix
is equivalent tosys.prefix
. For remote workflows, if the plugin is installed in the agent package,ctx.plugin.prefix
is equivalent tosys.prefix
. Otherwise, it returns the prefix in which the plugin is installed. This will be a subdirectory underVIRTUALENV/plugins
.ctx.plugin.workdir
Returns a work directory that is unique for the current (deployment_id
,plugin
) pair. This directory can be used in cases in whcih a plugin must write files to the file system to be read later. (Note that this directory is not be migated during Manager migration, so should not be considered persistent, but rather a convenient workspace).
Testing Your Plugin
In most cases, the recommendation is to test your plugin’s logic using local workflows, and only then run them as part of a Cloudify deployment. We have supplied you with a nice and tidy decorator to do just that. The cloudify-plugins-common’s test_utils package enables you to do that. It is intuitive to use, but an example is provided below:
from cloudify.test_utils import workflow_test
@workflow_test(
blueprint_path,
copy_plugin_yaml,
resources_to_copy,
temp_dir_prefix,
init_args,
inputs,
input_func_args,
input_func_kwargs
)
def test_my_task(self, cfy_local):
pass
Workflow Test Arguments
blueprint_path
- A path to the blueprint to run, this blueprint file is copied to a temporary test directory. This is the only mandatory input.copy_plugin_yaml
- For use in testing a plugin you created. If you specify this argument as True, The decorator tries to traverse up the directory tree from the test file to find plugin.yaml. If the file is found, it is copied to the root of the temporary test directory (together with the blueprint file). Therefore, importing the plugin.yaml of the current file should be implemented as if both the blueprint and the plugin.yaml are in the same folder.resources_to_copy
- This argument enables you to pass a list of:- File paths that would be copied to the root temporary test dir.
- A tuple of the format of (
source_path
,destination_path
), where thedestination_path
is relative to the root of the temporary test directory (the entire directory structure would be managed by the decorator).
temp_dir_prefix
- If you have a special request for the temporary test directory prefix, supply it here.init_args
- If you have any specific arguments to be pass to thelocal.init()
method, pass them through here.inputs
- A syntactic sugar for theinit_args
[‘inputs’] field.input_func_args
- To pass a function name to the inputs, use this argument to specify the arguments to the function.input_func_kwargs
- To pass a function name to the inputs, use this argument to specify the kwargs to the function.
The decorator sets up the environment for the test, and injects the environment as the first argument to the function.
For example, if it is called cfy_local
. You could run executions via cfy_local.execute('install')
, or access storage via cfy_local.storage
.
Passing Inputs
Passing inputs is not confined to static inputs:
You might want to pass a function name to the inputs argument, the function would be called and the returned value would be set as the inputs for the init. This is practical when using the same function for several decorator uses, while changing the inputs it receives. Note: iY need to handle the injected arguments and kwargs. For example:
from cloudify.test_utils import workflow_test def set_inputs(*args, **kwargs): inputs = {} ... return inputs @workflow_test(some_blue_print_path, inputs=set_inputs) def test_my_task(self, cfy_local) pass
Another option is to pass a path to a method belonging to the test method’s class. The reason for this, instead of just passing the method name, is that the method does not actually exist at the time that the decorator expression is evaluated, so using the method’s name enables you to gain access to such methods. For example:
from cloudify.test_utils import workflow_test class MyClass: def set_inputs(): inputs = {} ... return inputs @workflow_test(some_blue_print_path, inputs='set_inputs') def test_my_task(self, cfy_local) pass
Context Manager
The decorator functionality also exists as a context manager. However, the following features will not work:
- Copy_plugin_yaml or passing any relative path in resources_to_copy.
- Passing a path to a function.
Unit Testing
To unit test a specific function that needs a ctx
object, you can use cloudify.mocks.MockCloudifyContext
which is provided by cloudify-plugins-common
.
Example: Using MockCloudifyContext
Assuming the plugin code is located in my_plugin.py
:
from cloudify import ctx
@operation
def my_operation(**kwargs):
prop1 = ctx.node.properties['node_property_1']
ctx.logger.info('node_property_1={0}'.format(prop1))
Then use the following code to call the my_operation
operation using a mock context object:
from cloudify.mocks import MockCloudifyContext
from cloudify.state import current_ctx
import my_plugin
props = {'node_property_1': 'value_1'}
mock_ctx = MockCloudifyContext(node_id='test_node_id',
node_name='test_node_name',
properties=props)
try:
current_ctx.set(mock_ctx)
my_plugin.my_operation()
finally:
current_ctx.clear()
(Note: MockCloudifyContext
accepts various additional parameters. Check the documentation for more information).
Now that the plugin is created, you need to incorporate it in your blueprint. For more information, see the Plugins specification.
Supplementary Information
The Context Object
The ctx
context object contains contextual parameters that are mirrored from the blueprint, alongside additional functionality:
Properties Context Objects
ctx.instance.id
- The unique ID of the node’s instance.ctx.node.properties
- The properties of the node as declared under theproperties
dictionary.ctx.instance.runtime_properties
- The properties that are assigned to a node’s instance at runtime. These properties are either populated by the plugin itself (for instance, an automatically generated port that the plugin exposes when it’s run), or are generated prior to the invocation of the plugin (for instance, the ip of the machine the plugin is running on).
Utility Context Objects
ctx.logger
- A Cloudify-specific logging mechanism to send logs back to the Cloudify Manager environment.ctx.download_resource
- Downloads a specified resource.ctx.download_resource_and_render
- Downloads a specified resource and renders it according to an optional variables dictionary. The context itself is automatically injected, and available asctx
. A resource with the following content:deployment_id: {{ctx.deployment.id}} test: {{hello}}
and
{'hello': 'world'}
as atemplate_variables
dictionary, is downloaded as a resource with the following content:deployment_id: <current_deployment_id> test: world
ctx.get_resource
- Reads a resource’s data.ctx.get_resource_and_render
- Reads a resource’s data and renders it according to an optional variables dictionary. The context itself is automatically injected, and available asctx
. See example at ctx.download_resource_and_render.ctx.instance.update
- Updates the node’s runtime properties. This is automatically called each time an operation ends, meaning that it is only useful in the context of a single operation.
Cloud Plugins
The lifecycle start
operation should store the following runtime properties for the Compute
node instance:
ip
- The IP address of the VM to be accessed by Cloudify Manager.networks
- A dictionary containing network names as keys and list of IP addresses as values.
See the Cloudify OpenStack plugin for reference.