Source code for freckles.frecklet.vars

# -*- coding: utf-8 -*-
import abc
import logging
import os
import re
import secrets
import sys
from collections import Sequence
from typing import Mapping

from ruamel.yaml.comments import CommentedMap, CommentedSeq
from six import string_types

import click
import six
from colorama import Style
from stevedore import ExtensionManager

from freckles.exceptions import FrecklesVarException
from frutils import dict_merge
from frutils.exceptions import FrklException
from frutils.parameters import VarsTypeSimple
from ting.ting_attributes import TingAttribute
from ting.ting_cast import TingCast

log = logging.getLogger("freckles")


FRECKLES_CLICK_CEREBUS_ARG_MAP = {
    "string": str,
    "float": float,
    "integer": int,
    "boolean": bool,
    "dict": VarsTypeSimple(),
    # "password": str,
    # "list": list
}


# extensions
# ------------------------------------------------------------------------
[docs]def load_var_adapters(): """Loading a dictlet finder extension. Returns: ExtensionManager: the extension manager holding the extensions """ log2 = logging.getLogger("stevedore") out_hdlr = logging.StreamHandler(sys.stdout) out_hdlr.setFormatter(logging.Formatter("load adapter plugin error -> %(message)s")) out_hdlr.setLevel(logging.DEBUG) log2.addHandler(out_hdlr) log2.setLevel(logging.INFO) log.debug("Loading var adapters...") mgr = ExtensionManager( namespace="freckles.var_adapters", invoke_on_load=True, propagate_map_exceptions=True, ) result = {} for plugin in mgr: name = plugin.name ep = plugin.entry_point adapter = ep.load() result[name] = adapter() return result
VAR_ADAPTER_REGEX = re.compile("(::[a-z_1-9]+::)", re.RegexFlag.MULTILINE)
[docs]def is_var_adapter(value): if isinstance(value, string_types): m = re.findall(VAR_ADAPTER_REGEX, value) return len(m) > 0 elif isinstance(value, Sequence): for v in value: if is_var_adapter(v): return True elif isinstance(value, Mapping): for k, v in value.items(): if is_var_adapter(k) or is_var_adapter(v): return True return False
[docs]def get_resolved_var_adapter_object( value, key, arg, root_arg=True, frecklet=None, is_secret=None, inventory=None ): """Replaces string in object recursively, non-jinja-termplate version. Args: value: the object key: the string to be replaced repl: the replacement string Returns: tuple: the replaced object, and a boolean that indicates whether there was any change or not """ changed_global = False is_sec = False if isinstance(value, Mapping): result = CommentedMap() for k, v in value.items(): repl_value, sec, changed = get_resolved_var_adapter_object( value=v, key=k, arg=arg, root_arg=False, frecklet=frecklet, is_secret=is_secret, inventory=inventory, ) if changed: changed_global = True if sec: is_sec = True result[k] = repl_value elif isinstance(value, (list, tuple, CommentedSeq)): result = [] for v in value: repl_value, sec, changed = get_resolved_var_adapter_object( value=v, key=key, arg=arg, root_arg=False, frecklet=frecklet, is_secret=is_secret, inventory=inventory, ) if changed: changed_global = True if sec: is_sec = True result.append(repl_value) elif isinstance(value, string_types): result, is_sec, changed_global = get_value_from_var_adapter_string( value, key=key, arg=arg, root_arg=root_arg, frecklet=frecklet, is_secret=is_secret, inventory=inventory, ) else: result = value is_sec = is_secret changed_global = False return result, is_sec, changed_global
[docs]def get_value_from_var_adapter_string( value, key, arg, root_arg=True, frecklet=None, is_secret=None, inventory=None ): m = re.findall(VAR_ADAPTER_REGEX, value) if not m: return value, is_secret, False result_string = value is_sec = False for match in m: van = match[2:-2] temp, sec = get_value_from_var_adapter( van, key, arg, root_arg, frecklet=frecklet, is_secret=is_secret, inventory=inventory, ) if sec: is_sec = True result_string = result_string.replace(match, temp, 1) return result_string, is_sec, True
[docs]def get_value_from_var_adapter( var_adapter_name, key, arg, root_arg, frecklet=None, is_secret=None, inventory=None ): if not isinstance(var_adapter_name, six.string_types): raise FrklException( msg="Internal error", reason="Not a var adapter: {}".format(var_adapter_name), ) if var_adapter_name.startswith("value_from_"): if inventory is None: raise FrklException( "Can't retrieve value for var_adapter '{}'.".format(var_adapter_name), reason="No inventory provided.", solution="Don't use the '{}' var adapter in a non parent frecklet.", ) inventory_secrets = inventory.secret_keys() copy_var_name = var_adapter_name[11:] value = inventory.retrieve_value(copy_var_name) secret = is_secret or copy_var_name in inventory_secrets return value, secret else: if var_adapter_name not in VAR_ADAPTERS.keys(): raise FrecklesVarException( frecklet=frecklet, var_name=key, errors={key: "No var adapter '{}'.".format(var_adapter_name)}, solution="Double-check the var adapter name '{}', maybe there's a typo?\n\nIf the name is correct, make sure the python library that contains the var-adapter is installed in the same environment as freckles.".format( var_adapter_name ), ) var_adapter_obj = VAR_ADAPTERS[var_adapter_name] if is_secret is None: is_secret = arg.secret value = var_adapter_obj.retrieve_value( key_name=key, arg=arg, root_arg=root_arg, frecklet=frecklet, is_secret=is_secret, ) return value, is_secret
[docs]@six.add_metaclass(abc.ABCMeta) class VarAdapter(object): def __init__(self): pass
[docs] @abc.abstractmethod def retrieve_value( self, key_name, arg, root_arg, frecklet, is_secret=False, profile_names=None ): pass
[docs]class FreckletPathVarAdapter(VarAdapter): """ An adapter that returns the path to the frecklet that is currently executed. """ def __init__(self): pass
[docs] def retrieve_value( self, key_name, arg, root_arg, frecklet, is_secret=False, profile_names=None ): if not hasattr(frecklet, "full_path"): raise FrklException( msg="Can't resolve variable value 'frecklet_path'.", reason="frecklet in question is dynamic.", solution="Only use the '::frecklet_path::' var adapter in combination with local frecklets.", ) return frecklet.full_path
[docs]class FreckletDirVarAdapter(VarAdapter): """ An adapter that returns the path to the parent directory of the frecklet that is currently executed. """ def __init__(self): pass
[docs] def retrieve_value( self, key_name, arg, root_arg, frecklet, is_secret=False, profile_names=None ): if not hasattr(frecklet, "full_path"): raise FrklException( msg="Can't resolve variable value 'frecklet_dir'.", reason="frecklet in question is dynamic.", solution="Only use the '::frecklet_dir::' var adapter in combination with local frecklets.", ) return os.path.dirname(frecklet.full_path)
[docs]class PwdVarAdapter(VarAdapter): """ An adapter that returns the current directory on the machine where freckles is executed. """ def __init__(self): pass
[docs] def retrieve_value( self, key_name, arg, root_arg, frecklet, is_secret=False, profile_names=None ): return os.getcwd()
[docs]class RandomPasswordVarAdapter(VarAdapter): """ A variable adapter that returns a random password string. """ def __init__(self): pass
[docs] def retrieve_value( self, key_name, arg, root_arg, frecklet, is_secret=False, profile_names=None ): pw = secrets.token_urlsafe(24) return pw
[docs]class AskVarAdapter(VarAdapter): """ A var-adapter that asks the user to enter the value for this variable interactively. """ def __init__(self): pass
[docs] def retrieve_value( self, key_name, arg, root_arg, frecklet, is_secret=False, profile_names=None ): if root_arg: arg_type = arg.type else: arg_type = "string" click_type = FRECKLES_CLICK_CEREBUS_ARG_MAP.get(arg_type, None) if click_type is None: msg = "Don't use the 'ask' variable adapter for this argument." references = { "freckles cli documentation (not yet written)": "https://freckles.io/doc/cli/usage" } if not is_secret: msg = ( msg + "\n\nAs this argument is not a secret, you could use the '-v <var> commandline option." ) else: msg = ( msg + "\n\nThis argument is a secret, maybe you could supply it via the '-v <var>' option in combination with a variable file? Make sure you set secure file permissions for the var file in question ('chmod +x 0700 <var_file>'). Other options to supply passwords are not yet implemented unfortunately, but will be soon." ) references[ "freckles security documentation" ] = "https://freckles.io/doc/security" raise FrecklesVarException( frecklet=frecklet, var_name=key_name, errors={ key_name: "the 'ask' variable adapter does not support argument type '{}'".format( arg_type ) }, solution=msg, references=references, ) short_help = arg.doc.get_short_help( list_item_format=True, use_help=True, default="" ) if not short_help or not root_arg: ending = "'" else: ending = "': " + Style.DIM + short_help + Style.RESET_ALL click.echo( "Input needed for value '" + Style.BRIGHT + key_name + Style.RESET_ALL + ending ) arg_default = arg.default if not root_arg or (arg_default and is_var_adapter(arg_default)): arg_default = None if arg_default: value = click.prompt( " {}".format(key_name), hide_input=is_secret, type=click_type, default=arg_default, ) else: value = click.prompt( " {}".format(key_name), hide_input=is_secret, type=click_type, default=arg_default, ) click.echo() return value
[docs]class VarPathAttribute(TingAttribute): def __init__(self, prefix): self.prefix = prefix
[docs] def provides(self): return ["var_path"]
[docs] def requires(self): return ["var", "prefix"]
[docs] def get_attribute(self, ting, attribute_name=None): return "{}/{}".format(self.prefix, ting.var)
[docs]class VarCast(TingCast): VAR_ATTRS = [] def __init__(self, prefix): var_path_attr = VarPathAttribute(prefix) super(VarCast, self).__init__( "VarTing", ting_attributes=[var_path_attr], ting_id_attr="var_path", mixins=[], )
[docs]@six.add_metaclass(abc.ABCMeta) class Inventory(object): def __init__(self): pass
[docs] @abc.abstractmethod def retrieve_value(self, var_name, **task_context): pass
[docs] @abc.abstractmethod def set_value(self, var_name, new_value, is_secret=False, **task_context): pass
[docs] def get_vars(self, hide_secrets=False, **task_context): if not hide_secrets: return self._get_vars() result = {} secret_keys = self.secret_keys() for k, v in self._get_vars().items(): if k in secret_keys: v = "__secret__" result[k] = v return result
[docs] @abc.abstractmethod def secret_keys(self, **task_context): pass
@abc.abstractmethod def _get_vars(self, **task_context): pass
[docs]class VarsInventory(Inventory): def __init__(self, vars=None, secret_keys=None): super(VarsInventory, self).__init__() if vars is None: vars = [] if not isinstance(vars, Sequence): vars = [vars] self._init_vars_list = vars if secret_keys is None: secret_keys = [] self._secrets = secret_keys self._vars = {} for v in self._init_vars_list: dict_merge(self._vars, v, copy_dct=False)
[docs] def retrieve_value(self, var_name, **task_context): return self._vars.get(var_name, None)
[docs] def set_value(self, var_name, new_value, is_secret=False, **task_context): self._vars[var_name] = new_value if is_secret: self._secrets.append(var_name) if not is_secret and var_name in self._secrets: self._secrets.remove(var_name)
[docs] def secret_keys(self, **task_context): return self._secrets
def _get_vars(self, **task_context): return self._vars
try: VAR_ADAPTERS = load_var_adapters() except (Exception) as e: log.error("Could not load var adapters: {}".format(e)) VAR_ADAPTERS = {}