import os
import json
import requests
from cloudify_rest_client.constants import VisibilityState
from cloudify import constants, utils
from cloudify.state import ctx, workflow_ctx, NotInContext
from cloudify.exceptions import (HttpException,
NonRecoverableError)
from cloudify.cluster import CloudifyClusterClient
[docs]
class NodeInstance(object):
"""
Represents a deployment node instance.
An instance of this class contains runtime information retrieved
from Cloudify's runtime storage as well as the node's state.
"""
def __init__(self,
node_instance_id,
node_id,
runtime_properties=None,
state=None,
version=None,
host_id=None,
relationships=None,
index=None,
scaling_groups=None,
system_properties=None):
self.id = node_instance_id
self._node_id = node_id
self._runtime_properties = \
DirtyTrackingDict((runtime_properties or {}).copy())
self._state = state
self._version = version
self._host_id = host_id
self._relationships = relationships
self._index = index
self._scaling_groups = scaling_groups
self._system_properties = system_properties
def get(self, key):
return self._runtime_properties.get(key)
def put(self, key, value):
self._runtime_properties[key] = value
[docs]
def delete(self, key):
del self._runtime_properties[key]
__setitem__ = put
__getitem__ = get
__delitem__ = delete
def __contains__(self, key):
return key in self._runtime_properties
@property
def runtime_properties(self):
"""
The node instance runtime properties.
To update the properties, make changes on the returned dict and call
``update_node_instance`` with the modified instance.
"""
return self._runtime_properties
@runtime_properties.setter
def runtime_properties(self, new_properties):
# notify the old object of the changes - trigger a .modifiable check
self._runtime_properties._set_changed()
self._runtime_properties = DirtyTrackingDict(new_properties)
self._runtime_properties._set_changed()
@property
def version(self):
return self._version
@property
def state(self):
"""
The node instance state.
To update the node instance state, change this property value and
call ``update_node_instance`` with the modified instance.
"""
return self._state
@state.setter
def state(self, value):
self._state = value
@property
def dirty(self):
return self._runtime_properties.dirty
@property
def host_id(self):
return self._host_id
@property
def node_id(self):
return self._node_id
@property
def relationships(self):
return self._relationships
@property
def index(self):
return self._index
@property
def scaling_groups(self):
return {g['id']: g['name'] for g in self._scaling_groups}
@property
def system_properties(self):
return self._system_properties
[docs]
def get_rest_client(tenant=None, api_token=None):
"""
:param tenant: optional tenant name to connect as
:param api_token: optional api_token to authenticate with (instead of
using REST token)
:returns: A REST client configured to connect to the manager in context
:rtype: cloudify_rest_client.CloudifyClient
"""
if not tenant:
tenant = utils.get_tenant_name()
# Handle maintenance mode
headers = {}
if utils.get_is_bypass_maintenance():
headers['X-BYPASS-MAINTENANCE'] = 'True'
# If api_token or execution_token was provided no need to use REST token
token = None
execution_token = utils.get_execution_token()
if execution_token:
headers[constants.CLOUDIFY_EXECUTION_TOKEN_HEADER] = execution_token
elif api_token:
token = api_token
else:
token = utils.get_rest_token()
return CloudifyClusterClient(
headers=headers,
host=utils.get_manager_rest_service_host(),
port=utils.get_manager_rest_service_port(),
tenant=tenant,
token=token,
protocol=utils.get_manager_rest_service_protocol(),
cert=utils.get_local_rest_certificate(),
kerberos_env=utils.get_kerberos_indication(
os.environ.get(constants.KERBEROS_ENV_KEY))
)
def _save_resource(logger, resource, resource_path, target_path):
if not target_path:
target_path = os.path.join(utils.create_temp_folder(),
os.path.basename(resource_path))
with open(target_path, 'wb') as f:
f.write(resource)
logger.info("Downloaded %s to %s" % (resource_path, target_path))
return target_path
[docs]
def download_resource_from_manager(resource_path, logger, target_path=None):
"""
Download resource from the manager file server.
:param resource_path: path to resource on the file server
:param logger: logger to use for info output
:param target_path: optional target path for the resource
:returns: path to the downloaded resource
"""
resource = get_resource_from_manager(resource_path)
return _save_resource(logger, resource, resource_path, target_path)
[docs]
def download_resource(blueprint_id,
deployment_id,
tenant_name,
resource_path,
logger,
target_path=None):
"""
Download resource from the manager file server with path relative to
the deployment or blueprint denoted by ``deployment_id`` or
``blueprint_id``
An attempt will first be made for getting the resource from the deployment
folder. If not found, an attempt will be made for getting the resource
from the blueprint folder.
:param blueprint_id: the blueprint id of the blueprint to download the
resource from
:param deployment_id: the deployment id of the deployment to download the
resource from
:param tenant_name: the resource's tenant
:param resource_path: path to resource relative to blueprint or deployment
folder
:param logger: logger to use for info output
:param target_path: optional target path for the resource
:returns: path to the downloaded resource
"""
if _is_resource_origin_from_imported_blueprint(resource_path):
# If from a local blueprint or loaded blueprint import
namespace, resource_path = _extract_resource_parts(resource_path)
client = get_rest_client()
namespaces_mapping = client.blueprints.get(
blueprint_id, ['plan']).plan['namespaces_mapping']
if namespace in namespaces_mapping:
blueprint_id = namespaces_mapping[namespace]
resource = get_resource(blueprint_id,
deployment_id,
tenant_name,
resource_path)
return _save_resource(logger, resource, resource_path, target_path)
def _is_resource_origin_from_imported_blueprint(resource_path):
return constants.NAMESPACE_BLUEPRINT_IMPORT_DELIMITER in resource_path
def _extract_resource_parts(resource_path):
"""
A resource path can be with namespace, which will create the following
structure: <namespace><namespace-delimiter><the actual path>. So this
function will separate to the individual components.
"""
namespace, _, resource_path =\
resource_path.rpartition(
constants.NAMESPACE_BLUEPRINT_IMPORT_DELIMITER)
return namespace, resource_path
[docs]
def get_resource_from_manager(resource_path,
base_url=None,
base_urls=None):
"""Get resource from the manager file server.
:param resource_path: path to resource on the file server
:param base_url: The base URL to manager file server. Deprecated.
:param base_urls: A list of base URL to cluster manager file servers.
:param resource_path: path to resource on the file server.
:returns: resource content
"""
base_urls = base_urls or []
base_urls += utils.get_manager_file_server_url()
if base_url is not None:
base_urls.insert(0, base_url)
# if we have multiple managers to try, set connect_timeout so that
# we're not waiting forever for a single non-responding manager
if len(base_urls) > 1:
timeout = (10, None)
else:
timeout = None
verify = utils.get_local_rest_certificate()
headers = {}
try:
headers[constants.CLOUDIFY_EXECUTION_TOKEN_HEADER] = \
ctx.execution_token
except NotInContext:
headers[constants.CLOUDIFY_EXECUTION_TOKEN_HEADER] = \
workflow_ctx.execution_token
for ix, next_url in enumerate(base_urls):
url = '{0}/{1}'.format(next_url.rstrip('/'), resource_path.lstrip('/'))
try:
response = requests.get(
url, verify=verify, headers=headers, timeout=timeout)
except requests.ConnectionError:
continue
if not response.ok:
is_last = (ix == len(base_urls) - 1)
if not is_last:
# if there's more managers to try, try them: due to filesystem
# replication lag, they might have files that the previous
# manager didn't
continue
raise HttpException(url, response.status_code, response.reason)
return response.content
raise NonRecoverableError(
'Failed to download {0}: unable to connect to any manager (tried: {1})'
.format(resource_path, ', '.join(base_urls))
)
def _resource_paths(blueprint_id, deployment_id, tenant_name, resource_path,
use_global=True):
"""For the given resource_path, generate all firesever paths to try.
Eg. for path of "foo.txt", generate:
- /resources/deployments/default_tenant/dep1/foo.txt
- /resources/blueprints/default_tenant/bp1/foo.txt
- /foo.txt
"""
if deployment_id:
yield os.path.join(
constants.FILE_SERVER_DEPLOYMENTS_FOLDER,
tenant_name,
deployment_id,
resource_path
).replace('\\', '/')
if blueprint_id:
client = get_rest_client()
blueprint = client.blueprints.get(blueprint_id)
if blueprint['visibility'] == VisibilityState.GLOBAL:
tenant_name = blueprint['tenant_name']
yield os.path.join(
constants.FILE_SERVER_BLUEPRINTS_FOLDER,
tenant_name,
blueprint_id,
resource_path
).replace('\\', '/')
if use_global:
yield resource_path
[docs]
def get_resource(blueprint_id, deployment_id, tenant_name, resource_path):
"""
Get resource from the manager file server with path relative to
the deployment or blueprint denoted by ``deployment_id`` or
``blueprint_id``.
An attempt will first be made for getting the resource from the deployment
folder. If not found, an attempt will be made for getting the resource
from the blueprint folder.
:param blueprint_id: the blueprint id of the blueprint to download
the resource from
:param deployment_id: the deployment id of the deployment to download the
resource from
:param tenant_name: tenant name
:param resource_path: path to resource relative to blueprint folder
:returns: resource content
"""
tried_paths = []
for path in _resource_paths(
blueprint_id, deployment_id, tenant_name, resource_path):
try:
return get_resource_from_manager(path)
except NonRecoverableError:
tried_paths.append(path)
except HttpException as e:
if e.code != 404:
raise
tried_paths.append(path)
raise HttpException(','.join(tried_paths), 404, 'Resource not found: {0}'
.format(resource_path))
[docs]
def get_resource_directory_index(
blueprint_id, deployment_id, tenant_name, resource_path):
tried_paths = []
resource_files = set()
for path in _resource_paths(
blueprint_id, deployment_id, tenant_name, resource_path,
use_global=False):
try:
directory_index = get_resource_from_manager(path)
directory_index_dict = json.loads(directory_index)
for f in directory_index_dict.keys():
resource_files.add(f)
except (ValueError, KeyError, NonRecoverableError):
tried_paths.append(path)
except HttpException as e:
if e.code != 404:
raise
if not resource_files:
raise HttpException(','.join(tried_paths), 404,
'No valid resource directory listing at found: {0}'
.format(resource_path))
return list(resource_files)
[docs]
def get_node_instance(node_instance_id, evaluate_functions=False, client=None):
"""
Read node instance data from the storage.
:param node_instance_id: the node instance id
:param evaluate_functions: Evaluate intrinsic functions
:param client: a REST client to use
:rtype: NodeInstance
"""
if client is None:
client = get_rest_client()
instance = client.node_instances.get(
node_instance_id,
evaluate_functions=evaluate_functions
)
return NodeInstance(node_instance_id,
instance.node_id,
runtime_properties=instance.runtime_properties,
state=instance.state,
version=instance.version,
host_id=instance.host_id,
relationships=instance.relationships,
index=instance.index,
scaling_groups=instance.scaling_groups,
system_properties=instance.system_properties)
[docs]
def update_node_instance(node_instance, client=None):
"""
Update node instance data changes in the storage.
:param node_instance: the node instance with the updated data
:param client: a REST client to use
"""
if client is None:
client = get_rest_client()
client.node_instances.update(
node_instance.id,
state=node_instance.state,
runtime_properties=node_instance.runtime_properties,
version=node_instance.version)
[docs]
def get_node_instance_ip(node_instance_id, client=None):
"""
Get the IP address of the host the node instance denoted by
``node_instance_id`` is contained in.
"""
if client is None:
client = get_rest_client()
instance = client.node_instances.get(node_instance_id)
if instance.host_id is None:
raise NonRecoverableError('node instance: {0} is missing host_id'
'property'.format(instance.id))
if node_instance_id != instance.host_id:
instance = client.node_instances.get(instance.host_id)
if instance.runtime_properties.get('ip'):
return instance.runtime_properties['ip']
node = client.nodes.get(instance.deployment_id, instance.node_id)
if node.properties.get('ip'):
return node.properties['ip']
raise NonRecoverableError('could not find ip for node instance: {0} with '
'host id: {1}'.format(node_instance_id,
instance.id))
[docs]
def update_execution_status(execution_id, status, error=None, client=None):
"""
Update the execution status of the execution denoted by ``execution_id``.
:returns: The updated status
"""
if client is None:
client = get_rest_client()
return client.executions.update(execution_id, status, error)
[docs]
def get_bootstrap_context(client=None):
"""Read the manager bootstrap context."""
if client is None:
client = get_rest_client()
context = client.manager.get_context()['context']
context = context.get('cloudify', {})
context.setdefault('workflows', {}).update(
(c.name, c.value) for c in client.manager.get_config(scope='workflow')
)
return context
[docs]
def get_provider_context(client=None):
"""Read the manager provider context."""
if client is None:
client = get_rest_client()
context = client.manager.get_context()
return context['context']
class DirtyTrackingDict(dict):
def __init__(self, *args, **kwargs):
super(DirtyTrackingDict, self).__init__(*args, **kwargs)
self.modifiable = True
self.dirty = False
def __setitem__(self, key, value):
r = super(DirtyTrackingDict, self).__setitem__(key, value)
self._set_changed()
return r
def __delitem__(self, key):
r = super(DirtyTrackingDict, self).__delitem__(key)
self._set_changed()
return r
def update(self, E=None, **F):
r = super(DirtyTrackingDict, self).update(E, **F)
self._set_changed()
return r
def clear(self):
r = super(DirtyTrackingDict, self).clear()
self._set_changed()
return r
def pop(self, k, d=None):
r = super(DirtyTrackingDict, self).pop(k, d)
self._set_changed()
return r
def popitem(self):
r = super(DirtyTrackingDict, self).popitem()
self._set_changed()
return r
def _set_changed(self):
# python 2.6 doesn't have modifiable during copy.deepcopy
if hasattr(self, 'modifiable') and not self.modifiable:
raise NonRecoverableError('Cannot modify runtime properties of'
' relationship node instances')
self.dirty = True
def _get_workdir_path(deployment_id, tenant):
return os.path.join('/opt', 'manager', 'resources', 'deployments',
tenant, deployment_id)