Source code for freckles.frecklet.arguments

# -*- coding: utf-8 -*-
import copy
import logging
from collections import OrderedDict, Sequence

import click
from jinja2 import TemplateSyntaxError
from six import string_types

from freckles.defaults import DEFAULT_FRECKLES_JINJA_ENV, FRECKLES_DEFAULT_ARG_SCHEMA
from freckles.exceptions import FreckletBuildException, FreckletException
from freckles.frecklet.vars import is_var_adapter, FRECKLES_CLICK_CEREBUS_ARG_MAP
from frutils import get_template_keys, dict_merge, readable_yaml
from ting.ting_attributes import TingAttribute, Arg
from ting.ting_cast import MultiCacheResult

log = logging.getLogger("freckles")


[docs]class CliArgumentsAttribute(TingAttribute): DEFAULT_CLI_SCHEMA = {"show_default": True, "param_type": "option"} def __init__(self, target_attr_name="cli_arguments", source_attr_name="vars"): self.target_attr_name = target_attr_name self.source_attr_name = source_attr_name
[docs] def provides(self): return [self.target_attr_name]
[docs] def requires(self): return [self.source_attr_name]
[docs] def get_attribute(self, ting, attribute_name=None): # TODO: validate vars = getattr(ting, self.source_attr_name) result = [] for var_name, var in vars.items(): parameter = self.create_parameter(var_name, var) if parameter is not None: result.append(parameter) required = [] optional = [] for p in result: if p.required: required.append(p) else: optional.append(p) return required + optional
[docs] def create_parameter(self, var_name, var): if not var.cli.get("enabled", True): return None option_properties = dict_merge( CliArgumentsAttribute.DEFAULT_CLI_SCHEMA, var.cli, copy_dct=True ) param_type = option_properties.pop("param_type") if not var.required: option_properties["required"] = False else: if var.schema.get("excludes", []): option_properties["required"] = False else: option_properties["required"] = True auto_param_decls = False if "param_decls" not in option_properties.keys(): auto_param_decls = True if param_type == "option": decls = [] for a in var.aliases: if len(a) == 1: decls.append("-{}".format(a)) else: a = a.replace("_", "-") decls.append("--{}".format(a)) option_properties["param_decls"] = decls else: option_properties["param_decls"] = [var.key] if "metavar" not in option_properties.keys(): option_properties["metavar"] = var.key.upper() # setting type cerberus_type = var.type if not isinstance(cerberus_type, string_types) and isinstance( cerberus_type, Sequence ): replacement = None cerberus_type = "multi" else: replacement = FRECKLES_CLICK_CEREBUS_ARG_MAP.get(cerberus_type, None) if replacement is not None: if replacement == bool: option_properties["type"] = None option_properties["default"] = None if "is_flag" not in option_properties.keys(): option_properties["is_flag"] = True # we don't add the type here, otherwise click fails for whatever reason if "param_decls" not in option_properties.keys() or auto_param_decls: temp = var.key.replace("_", "-") option_properties["param_decls"] = [ "--{}/--no-{}".format(temp, temp) ] else: option_properties["type"] = replacement elif cerberus_type == "list": arg_schema = var.schema.get("schema", {}) schema_type = arg_schema.get("type", "string") replacement = FRECKLES_CLICK_CEREBUS_ARG_MAP.get(schema_type, click.STRING) option_properties["type"] = replacement if param_type == "option": option_properties["multiple"] = True else: if ( "nargs" not in option_properties.keys() and "default" not in option_properties.keys() ): option_properties["nargs"] = -1 elif cerberus_type != "multi": raise Exception("Type '{}' not implemented yet.".format(cerberus_type)) # if var.secret: # if "default" not in option_properties.keys(): # if option_properties["required"]: # option_properties["default"] = "::ask::" # option_properties["show_default"] = True if option_properties.get("default", None) is not None: default_val = option_properties["default"] if is_var_adapter(default_val): option_properties["type"] = str if var.secret: option_properties["show_default"] = False if param_type == "option": option_properties["help"] = var.doc.get_short_help() p = click.Option(**option_properties) else: option_properties.pop("show_default", None) if ( "nargs" in option_properties.keys() and "default" in option_properties.keys() ): log.warning( "Removing 'nargs' property from argument '{}' ('nargs' & 'default' are not allowed together)".format( var_name ) ) option_properties.pop("nargs") p = click.Argument(**option_properties) return p
[docs]class VariablesFilterAttribute(TingAttribute): def __init__( self, source_attr_name="vars", target_attr_name="vars_required", required=True ): self.source_attr_name = source_attr_name self.target_attr_name = target_attr_name self.required = required
[docs] def requires(self): return [self.source_attr_name]
[docs] def provides(self): return [self.target_attr_name]
[docs] def get_attribute(self, ting, attribute_name=None): vars = getattr(ting, self.source_attr_name) result = OrderedDict() for var_name, arg_obj in vars.items(): req = arg_obj.required if req and self.required: result[var_name] = arg_obj elif not req and not self.required: result[var_name] = arg_obj return result
ALLOWED_ARG_INHERIT_STRATEGIES = ["strict", "required"]
[docs]class VarsAttribute(TingAttribute): def __init__(self): pass
[docs] def provides(self): return ["vars"]
[docs] def requires(self): return ["vars_frecklet", "vars_const", "const"]
[docs] def get_attribute(self, ting, attribute_name=None): result = {} for k, v in ting.vars_const.items(): result[k] = v for k, v in ting.vars_frecklet.items(): if k in ting.const.keys(): continue if k not in result.keys(): result[k] = v return result
[docs]class VariablesAttribute(TingAttribute): def __init__( self, target_attr_name="vars_frecklet", default_argument_description=None ): self.target_attr_name = target_attr_name self.default_argument_description = default_argument_description
[docs] def provides(self): return [self.target_attr_name, "{}_tree".format(self.target_attr_name)]
[docs] def requires(self): return ["task_tree", "const"]
[docs] def get_attribute(self, ting, attribute_name=None): task_tree = ting.task_tree paths_to_leaves = task_tree.paths_to_leaves() vars_tree = {} for path in paths_to_leaves: leaf_node_id = path[-1] args = self.get_vars_from_path(path_to_leaf=path, tree=task_tree, ting=ting) vars_tree[leaf_node_id] = args vars = self.consolidate_vars(vars_tree, ting) result = { "{}_tree".format(self.target_attr_name): vars_tree, self.target_attr_name: vars, } return MultiCacheResult(**result)
[docs] def consolidate_vars(self, vars_tree, ting): result = {} for args in vars_tree.values(): for arg_name, arg in args.items(): if ( arg_name in result.keys() and not result[arg_name].is_auto_arg and not arg.is_auto_arg and result[arg_name].schema != arg.schema ): reason = ( "Two different argument schemes for the same argument name:\n\n" ) reason = ( reason + readable_yaml(result[arg_name].schema, indent=2) + "\n -------------------------------\n\n" ) reason = reason + readable_yaml(arg.schema, indent=2) raise FreckletBuildException( frecklet=ting, msg="Duplicate arg '{}'.".format(arg_name), solution="Check format of frecklet '{}'.".format(ting.id), reason=reason, references={ "frecklet documentation": "https://freckles.io/doc/frecklets/anatomy" }, ) if arg_name not in result.keys() or not arg.is_auto_arg: result[arg_name] = arg ordered = OrderedDict() for k in sorted(result.keys()): ordered[k] = result[k] return ordered
[docs] def create_new_args_map(self, parent_var, key, available_args, arg): args = {} if parent_var == "{{{{:: {} ::}}}}".format(key): # this means the var is just being carried forward, from the point of view of the parent frecklet task arg_config = available_args.get(key, None) if arg_config is None: # print("USING CHILD ARG") new_arg = arg else: new_arg = Arg( key, arg_config, default_schema=FRECKLES_DEFAULT_ARG_SCHEMA ) new_arg.add_child(arg) new_arg.var_template = parent_var args[key] = new_arg else: template_keys = get_template_keys( parent_var, jinja_env=DEFAULT_FRECKLES_JINJA_ENV ) for tk in template_keys: arg_config = available_args.get(tk, None) if arg_config is None: if tk == key: new_arg = arg else: new_arg = Arg( tk, {}, default_schema=FRECKLES_DEFAULT_ARG_SCHEMA, is_auto_arg=True, ) new_arg.add_child(arg) else: new_arg = Arg( tk, arg_config, default_schema=FRECKLES_DEFAULT_ARG_SCHEMA ) new_arg.add_child(arg) new_arg.var_template = parent_var args[tk] = new_arg return args
[docs] def resolve_vars( self, current_args, rest_path, last_node, tree, ting, is_root_level ): current_node_id = next(rest_path) current_node = tree.get_node(current_node_id) if current_node_id == 0: vars = {} for key in current_args.keys(): vars[key] = "{{{{:: {} ::}}}}".format(key) else: vars = current_node.data["task"]["vars"] if current_node_id == 0: available_args = current_node.data.args else: available_args = current_node.data["task"]["args"] args = {} for key, arg in current_args.items(): # if key in ting.const.keys(): # parent_var = ting.const[key] # print("---") # print(parent_var) if key in vars.keys(): parent_var = vars[key] elif key not in vars.keys(): inherit = ting.meta.get("vars", {}).get("inherit", False) if inherit: vars[key] = "{{{{:: {} ::}}}}".format(key) parent_var = vars[key] else: if arg.required and arg.default: # relevant frecklet root name: tree.get_node(0).tag, raise FreckletBuildException( frecklet=ting, msg="Invalid frecklet-task '{}' in '{}': does not provide required/non-defaulted var '{}' for child frecklet '{}'.".format( current_node.tag, tree.get_node(0).tag, key, last_node.tag, ), solution="Check format of frecklet '{}'.".format( current_node.tag ), references={ "frecklet documentation": "https://freckles.io/doc/frecklets/anatomy" }, ) else: continue new_args = self.create_new_args_map( parent_var=parent_var, key=key, available_args=available_args, arg=arg ) for k, v in new_args.items(): # if k in args.keys(): # log.warning("Duplicate arg key: {}".format(k)) args[k] = v if current_node_id != 0: r = self.resolve_vars( current_args=args, rest_path=rest_path, last_node=current_node, tree=tree, ting=ting, is_root_level=False, ) return r else: # new_args = {} # replaced = [] # for key, arg in args.items(): # if key in ting.const.keys(): # replaced.append(key) # parent_var = ting.const[key] # temp_args = self.create_new_args_map(parent_var=parent_var, key=key, available_args=available_args, arg=args[key]) # for k, v in temp_args.items(): # new_args[k] = v # else: # new_args[key] = arg # # args = new_args root_task = last_node.data["task"] try: tks = get_template_keys(root_task, jinja_env=DEFAULT_FRECKLES_JINJA_ENV) except (TemplateSyntaxError) as e: raise FreckletException(ting, e, ting.id) # for r in replaced: # tks.remove(r) root_tks = {} for tk in tks: if tk in args.keys(): continue # add a template key that is under either 'frecklet' or 'task' of the root task of a frecklet arg = available_args.get(tk, None) auto_arg = False if arg is None: if self.default_argument_description is None: f_name = current_node.data.id raise Exception( "No argument description for argument '{}' in frecklet '{}'".format( tk, f_name ) ) else: arg = copy.copy(self.default_argument_description) auto_arg = True arg = Arg( tk, arg, default_schema=FRECKLES_DEFAULT_ARG_SCHEMA, is_auto_arg=auto_arg, ) root_tks[tk] = arg dict_merge(args, root_tks, copy_dct=False) return args
[docs] def get_vars_from_path(self, path_to_leaf, tree, ting): root = path_to_leaf[-1] root_node = tree.get_node(root) # end = path_to_leave[0] available_args = root_node.data["root_frecklet"].args template_keys = root_node.data["root_frecklet"].template_keys args = Arg.from_keys( template_keys, available_args, default_schema=FRECKLES_DEFAULT_ARG_SCHEMA ) rest_path = reversed(path_to_leaf[0:-1]) resolved = self.resolve_vars( current_args=args, rest_path=rest_path, last_node=root_node, tree=tree, ting=ting, is_root_level=True, ) return resolved