import warnings
from cloudify.models_states import ExecutionState
from cloudify_rest_client import utils
from cloudify_rest_client.exceptions import CloudifyClientError
from cloudify_rest_client.responses import ListResponse
[docs]
class Execution(dict):
"""Cloudify workflow execution."""
TERMINATED = 'terminated'
FAILED = 'failed'
CANCELLED = 'cancelled'
PENDING = 'pending'
STARTED = 'started'
CANCELLING = 'cancelling'
FORCE_CANCELLING = 'force_cancelling'
KILL_CANCELLING = 'kill_cancelling'
QUEUED = 'queued'
SCHEDULED = 'scheduled'
END_STATES = [TERMINATED, FAILED, CANCELLED]
def __init__(self, execution):
self.update(execution)
@property
def id(self):
"""
:return: The execution's id.
"""
return self.get('id')
@property
def deployment_id(self):
"""
:return: The deployment's id this execution is related to.
"""
return self.get('deployment_id')
@property
def blueprint_id(self):
"""
:return: The deployment's main blueprint id this execution is
related to.
"""
return self.get('blueprint_id')
@property
def status(self):
"""
:return: The execution's status.
"""
return self.get('status')
@property
def status_display(self):
"""
:return: The human-readable form of the execution's status.
"""
return self.get('status_display')
@property
def error(self):
"""
:return: The execution error in a case of failure, otherwise None.
"""
return self.get('error')
@property
def workflow_id(self):
"""
:return: The id of the workflow this execution represents.
"""
return self.get('workflow_id')
@property
def parameters(self):
"""
:return: The execution's parameters
"""
return self.get('parameters') or {}
@property
def is_system_workflow(self):
"""
:return: True if the workflow executed is a system workflow, otherwise
False
"""
return self.get('is_system_workflow', False)
@property
def created_at(self):
"""
:return: The execution creation time.
"""
return self.get('created_at')
@property
def started_at(self):
"""
:return: The execution start time.
"""
return self.get('started_at')
@property
def ended_at(self):
"""
:return: The execution end time.
"""
return self.get('ended_at')
@property
def created_by(self):
"""
:return: The name of the execution creator.
"""
return self.get('created_by')
@property
def scheduled_for(self):
"""
:return: The time this execution is scheduled for (if any)
"""
return self.get('scheduled_for')
@property
def is_dry_run(self):
"""
:return: True if the execution was performed as a dry run
"""
return self.get('is_dry_run', False)
@property
def total_operations(self):
"""
:return: The total count of operations in this execution
"""
return self.get('total_operations', False)
@property
def finished_operations(self):
"""
:return: The count of finished operations in this execution
"""
return self.get('finished_operations', False)
[docs]
class ExecutionGroup(dict):
def __init__(self, group):
super(ExecutionGroup, self).__init__()
self.update(group)
@property
def id(self):
"""The ID of this group"""
return self['id']
@property
def execution_ids(self):
"""IDs of executions in this group"""
return self.get('execution_ids')
@property
def status(self):
"""Status of this group, based on the status of each execution"""
return self.get('status')
@property
def deployment_group_id(self):
"""Deployment group ID that this execution group was started from"""
return self.get('deployment_group_id')
@property
def workflow_id(self):
"""The workflow that this execution group is running"""
return self.get('workflow_id')
@property
def concurrency(self):
"""The group runs this many executions at a time"""
return self.get('concurrency')
[docs]
class ExecutionGroupsClient(object):
def __init__(self, api):
self.api = api
[docs]
def list(self, _include=None, **kwargs):
response = self.api.get('/execution-groups', params=kwargs,
_include=_include)
return ListResponse(
[ExecutionGroup(item) for item in response['items']],
response['metadata'])
[docs]
def get(self, execution_group_id, _include=None):
response = self.api.get(
'/execution-groups/{0}'.format(execution_group_id),
_include=_include,
)
return ExecutionGroup(response)
[docs]
def create(self, deployment_group_id, workflow_id, executions,
force=False, default_parameters=None, parameters=None,
concurrency=5, created_by=None, created_at=None,
id=None):
"""Create an exec group without running it.
Internal use only.
"""
if not executions:
raise RuntimeError('Executions must be provided when '
'creating an exec group without running it.')
args = {
'id': id,
'force': force,
'deployment_group_id': deployment_group_id,
'workflow_id': workflow_id,
'parameters': parameters,
'default_parameters': default_parameters,
'concurrency': concurrency,
'associated_executions': executions,
}
if created_by:
args['created_by'] = created_by
if created_at:
args['created_at'] = created_at
response = self.api.post('/execution-groups', data=args)
return ExecutionGroup(response)
[docs]
def start(self, deployment_group_id, workflow_id, force=False,
default_parameters=None, parameters=None,
concurrency=5):
"""Start an execution group from a deployment group.
:param deployment_group_id: start an execution for every deployment
belonging to this deployment group
:param workflow_id: the workflow to run
:param force: force concurrent execution
:param default_parameters: default parameters for every execution
:param parameters: a dict of {deployment_id: params_dict}, overrides
the default parameters on a per-deployment basis
:param concurrency: run this many executions at a time
"""
response = self.api.post('/execution-groups', data={
'force': force,
'deployment_group_id': deployment_group_id,
'workflow_id': workflow_id,
'parameters': parameters,
'default_parameters': default_parameters,
'concurrency': concurrency
})
return ExecutionGroup(response)
[docs]
def cancel(self, execution_group_id, force=False, kill=False):
"""Cancel the executions in this group.
This cancels every non-queued execution according to the params,
see executions.cancel for their semantics.
Queued executions are marked cancelled immediately.
"""
action = 'kill' if kill else 'force-cancel' if force else 'cancel'
response = self.api.post(
'/execution-groups/{0}'.format(execution_group_id),
data={'action': action})
return ExecutionGroup(response)
[docs]
def resume(self, execution_group_id, force=False):
"""Resume the executions in this group."""
action = 'force-resume' if force else 'resume'
response = self.api.post(
'/execution-groups/{0}'.format(execution_group_id),
data={'action': action})
return ExecutionGroup(response)
[docs]
def set_target_group(self, execution_group_id,
success_group=None, failed_group=None):
"""Set the success or failure target group for this execution-group
Deployments that have executions in this execution-group which
terminated successfully, will be added to the success group.
Deployments that have executions in this execution-group which
failed, will be added to the failure group.
Cancelled executions have no effect.
:param execution_group_id: ID of the execution group
:param success_group: ID of the target success deployment group
:param success_group: ID of the target failure deployment group
:return: The updated ExecutionGroup
"""
response = self.api.patch(
'/execution-groups/{0}'.format(execution_group_id),
data={
'success_group_id': success_group,
'failure_group_id': failed_group,
}
)
return ExecutionGroup(response)
[docs]
def set_concurrency(self, execution_group_id, concurrency):
"""Change the concurrency setting of an execution-group.
This affects the de-queueing mechanism: when starting queued
executions, the new concurrency setting will be used.
:param execution_group_id: ID of the execution group
:param concurrency: the new concurrency setting, a natural number
:return: The updated ExecutionGroup
"""
response = self.api.patch(
'/execution-groups/{0}'.format(execution_group_id),
data={
'concurrency': concurrency,
},
)
return ExecutionGroup(response)
[docs]
def dump(self, execution_group_ids=None):
"""Generate execution groups' attributes for a snapshot.
:param execution_group_ids: A list of execution groups' identifiers,
if not empty, used to select specific execution groups to be dumped.
:returns: A generator of dictionaries, which describe execution
groups' attributes.
"""
entities = utils.get_all(
self.api.get,
'/execution-groups',
params={'_get_data': True},
_include=['id', 'created_at', 'workflow_id', 'execution_ids',
'concurrency', 'deployment_group_id', 'created_by'],
)
if not execution_group_ids:
return entities
return (e for e in entities if e['id'] in execution_group_ids)
[docs]
def restore(self, entities, logger):
"""Restore execution groups from a snapshot.
:param entities: An iterable (e.g. a list) of dictionaries describing
execution groups to be restored.
:param logger: A logger instance.
"""
for entity in entities:
entity['executions'] = entity.pop('execution_ids')
try:
self.create(**entity)
except (CloudifyClientError, RuntimeError) as exc:
logger.error("Error restoring execution group "
f"{entity['id']}: {exc}")
[docs]
class ExecutionsClient(object):
def __init__(self, api):
self.api = api
self._uri_prefix = 'executions'
self._wrapper_cls = Execution
def _create_filters(
self,
deployment_id=None,
include_system_workflows=False,
sort=None,
is_descending=False,
**kwargs
):
params = {'_include_system_workflows': include_system_workflows}
if deployment_id:
params['deployment_id'] = deployment_id
params.update(kwargs)
if sort:
params['_sort'] = '-' + sort if is_descending else sort
return params
[docs]
def should_start(self, execution_id):
"""
Check if an execution can currently start running (no system exeuctions
/ executions under the same deployment are currently running).
:param execution_id: Id of the executions that needs to be checked.
:return: Whether or not this execution can currently start
"""
assert execution_id
uri = '/{self._uri_prefix}/{id}/should-start'.format(
self=self, id=execution_id)
response = self.api.get(uri)
return response
[docs]
def list(self, _include=None, **kwargs):
"""Returns a list of executions.
:param deployment_id: Optional deployment id to get executions for.
:param include_system_workflows: Include executions of system
workflows
:param _include: List of fields to include in response.
:param sort: Key for sorting the list.
:param is_descending: True for descending order, False for ascending.
:param kwargs: Optional filter fields. For a list of available fields
see the REST service's models.Execution.fields
:return: Executions list.
"""
params = self._create_filters(**kwargs)
response = self.api.get(
'/{self._uri_prefix}'.format(self=self),
params=params,
_include=_include)
return ListResponse(
[self._wrapper_cls(item) for item in response['items']],
response['metadata']
)
[docs]
def get(self, execution_id, _include=None):
"""Get execution by its id.
:param execution_id: Id of the execution to get.
:param _include: List of fields to include in response.
:return: Execution.
"""
assert execution_id
uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id)
response = self.api.get(uri, _include=_include)
return self._wrapper_cls(response)
[docs]
def update(self, execution_id, status, error=None):
"""Update execution with the provided status and optional error.
:param execution_id: Id of the execution to update.
:param status: Updated execution status.
:param error: Updated execution error (optional).
:return: Updated execution.
"""
uri = '/executions/{0}'.format(execution_id)
params = {'status': status}
if error:
params['error'] = error
response = self.api.patch(uri, data=params)
return Execution(response)
[docs]
def start(self, *args, **kwargs):
"""Starts a deployment's workflow execution whose id is provided.
:param deployment_id: The deployment's id to execute a workflow for.
:param workflow_id: The workflow to be executed id.
:param parameters: Parameters for the workflow execution.
:param allow_custom_parameters: Determines whether to allow\
parameters which weren't defined in the workflow parameters schema\
in the blueprint.
:param force: Determines whether to force the execution of the\
workflow in a case where there's an already running execution for\
this deployment.
:param dry_run: If set to true, no actual actions will be performed.\
This is a dry run of the execution
:param queue: If set, blocked executions will be queued and
automatically run when possible
:param schedule: A string representing the date and time this
workflow should be executed at. If not passed this workflow will be
executed immediately.
:raises: IllegalExecutionParametersError
:return: The created execution.
"""
return self.create(*args, **kwargs)
[docs]
def create(self, deployment_id, workflow_id, parameters=None,
allow_custom_parameters=False, force=False, dry_run=False,
queue=False, schedule=None, force_status=None,
created_by=None, created_at=None, started_at=None,
ended_at=None, execution_id=None, wait_after_fail=600,
is_system_workflow=None, error=None):
"""Creates an execution on a deployment.
If force_status is provided, the execution will not be started.
Otherwise, parameters and return value are identical to 'start'.
"""
if schedule:
warnings.warn("The 'schedule' flag is deprecated. Please use "
"`cfy deployments schedule create instead`",
DeprecationWarning)
data = {
'deployment_id': deployment_id,
'workflow_id': workflow_id,
'parameters': parameters,
'allow_custom_parameters': str(allow_custom_parameters).lower(),
'force': str(force).lower(),
'dry_run': str(dry_run).lower(),
'queue': str(queue).lower(),
'scheduled_time': schedule,
'wait_after_fail': wait_after_fail,
'force_status': force_status,
'created_by': created_by,
'created_at': created_at,
'started_at': started_at,
'ended_at': ended_at,
'id': execution_id,
'is_system_workflow': is_system_workflow,
'error': error,
}
uri = '/executions'
response = self.api.post(uri,
data=data,
expected_status_code=201)
return Execution(response)
[docs]
def cancel(self, execution_id, force=False, kill=False):
"""Cancels an execution.
:param execution_id: id of the execution to cancel
:param force: force-cancel the execution: does not wait for the
workflow function to return
:param kill: kill the workflow process and the operation processes
:return: Cancelled execution.
"""
uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id)
action = 'kill' if kill else 'force-cancel' if force else 'cancel'
response = self.api.post(uri,
data={'action': action},
expected_status_code=200)
return self._wrapper_cls(response)
[docs]
def resume(self, execution_id, force=False):
"""Resume an execution.
:param execution_id: Id of the execution to resume.
:param force: Whether to resume failed/cancelled executions by
retrying their failed tasks.
:return: Resumed execution.
"""
uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id)
action = 'force-resume' if force else 'resume'
response = self.api.post(uri,
data={'action': action},
expected_status_code=200)
return self._wrapper_cls(response)
[docs]
def requeue(self, execution_id):
"""
Requeue an execution (e.g. after snapshot restore).
:param execution_id: Id of the execution to be requeued.
:return: Requeued execution.
"""
uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id)
response = self.api.post(uri,
data={'action': 'requeue'},
expected_status_code=200)
return self._wrapper_cls(response)
[docs]
def delete(self, to_datetime=None, keep_last=None, **kwargs):
"""Deletes finished executions from the DB.
:param to_datetime: Until which timestamp to delete executions
:param keep_last: How many most recent executions to keep from deletion
:param kwargs: Optional filter fields. For a list of available fields
see the REST service's models.Execution.fields
:return: List of deleted executions.
Parameters `to_datetime` and `keep_last` are mutually-exclusive.
"""
data = {}
if to_datetime:
data['to_datetime'] = to_datetime.isoformat()
if keep_last:
data['keep_last'] = keep_last
response = self.api.delete('/{self._uri_prefix}'.format(self=self),
data=data,
params=kwargs,
expected_status_code=200)
return response['items'][0]['count']
[docs]
def dump(self, execution_ids=None):
"""Generate executions' attributes for a snapshot.
:param execution_ids: A list of executions' identifiers, if not
empty, used to select specific executions to be dumped.
:returns: A generator of dictionaries, which describe executions'
attributes.
"""
entities = utils.get_all(
self.api.get,
f'/{self._uri_prefix}',
_include=['deployment_id', 'workflow_id', 'parameters',
'is_dry_run', 'allow_custom_parameters', 'status',
'created_by', 'created_at', 'id', 'started_at',
'ended_at', 'error', 'is_system_workflow'],
params={
'_get_data': True,
'_include_system_workflows': True,
},
)
if not execution_ids:
return entities
return (e for e in entities if e['id'] in execution_ids)
[docs]
def restore(self, entities, logger):
"""Restore executions from a snapshot.
:param entities: An iterable (e.g. a list) of dictionaries describing
executions to be restored.
:param logger: A logger instance.
"""
for entity in entities:
entity['execution_id'] = entity.pop('id')
entity['force_status'], entity['error'] = \
restore_status_error_mapped(
entity.pop('status'),
entity.pop('error'),
)
entity['dry_run'] = entity.pop('is_dry_run')
entity['deployment_id'] = entity['deployment_id'] or ''
try:
self.create(**entity)
except CloudifyClientError as exc:
logger.error("Error restoring execution "
f"{entity['execution_id']}: {exc}")
[docs]
def restore_status_error_mapped(status, error):
if status in ExecutionState.IN_PROGRESS_STATES:
return (
ExecutionState.CANCELLED,
"Marked as cancelled by snapshot restore",
)
return status, error