# -*- coding: utf-8 -*-
import logging
from collections import OrderedDict, Sequence
import click
from jinja2 import TemplateSyntaxError
from six import string_types
from freckles.defaults import DEFAULT_FRECKLES_JINJA_ENV, FRECKLES_DEFAULT_ARG_SCHEMA
from freckles.exceptions import FreckletBuildException, FreckletException
from frutils import get_template_keys, dict_merge
from frutils.parameters import VarsTypeSimple
from ting.ting_attributes import TingAttribute, Arg
from ting.ting_cast import MultiCacheResult
log = logging.getLogger("freckles")
[docs]class CliArgumentsAttribute(TingAttribute):
CLICK_CEREBUS_ARG_MAP = {
"string": str,
"float": float,
"integer": int,
"boolean": bool,
"dict": VarsTypeSimple(),
"password": str,
# "list": list
}
DEFAULT_CLI_SCHEMA = {"show_default": True, "param_type": "option"}
def __init__(
self, target_attr_name="cli_arguments", source_attr_name="vars_frecklet"
):
self.target_attr_name = target_attr_name
self.source_attr_name = source_attr_name
[docs] def provides(self):
return [self.target_attr_name]
[docs] def requires(self):
return [self.source_attr_name]
[docs] def get_attribute(self, ting, attribute_name=None):
# TODO: validate
vars = getattr(ting, self.source_attr_name)
result = []
for var_name, var in vars.items():
parameter = self.create_parameter(var_name, var)
if parameter is not None:
result.append(parameter)
required = []
optional = []
for p in result:
if p.required:
required.append(p)
else:
optional.append(p)
return required + optional
[docs] def create_parameter(self, var_name, var):
if not var.cli.get("enabled", True):
return None
option_properties = dict_merge(
CliArgumentsAttribute.DEFAULT_CLI_SCHEMA, var.cli, copy_dct=True
)
param_type = option_properties.pop("param_type")
if var.default is not None:
option_properties["default"] = var.default
option_properties["required"] = var.required
auto_param_decls = False
if "param_decls" not in option_properties.keys():
auto_param_decls = True
if param_type == "option":
decls = []
for a in var.aliases:
if len(a) == 1:
decls.append("-{}".format(a))
else:
a = a.replace("_", "-")
decls.append("--{}".format(a))
option_properties["param_decls"] = decls
else:
option_properties["param_decls"] = [var.key]
if "metavar" not in option_properties.keys():
option_properties["metavar"] = var.key.upper()
# setting type
cerberus_type = var.type
if not isinstance(cerberus_type, string_types) and isinstance(
cerberus_type, Sequence
):
replacement = None
cerberus_type = "multi"
else:
replacement = CliArgumentsAttribute.CLICK_CEREBUS_ARG_MAP.get(
cerberus_type, None
)
if replacement is not None:
if replacement == bool:
option_properties["type"] = None
option_properties["default"] = None
if "is_flag" not in option_properties.keys():
option_properties["is_flag"] = True
# we don't add the type here, otherwise click fails for whatever reason
if "param_decls" not in option_properties.keys() or auto_param_decls:
temp = var.key.replace("_", "-")
option_properties["param_decls"] = [
"--{}/--no-{}".format(temp, temp)
]
else:
option_properties["type"] = replacement
elif cerberus_type == "list":
arg_schema = var.schema.get("schema", {})
schema_type = arg_schema.get("type", "string")
replacement = CliArgumentsAttribute.CLICK_CEREBUS_ARG_MAP.get(
schema_type, click.STRING
)
option_properties["type"] = replacement
if param_type == "option":
option_properties["multiple"] = True
else:
if (
"nargs" not in option_properties.keys()
and "default" not in option_properties.keys()
):
option_properties["nargs"] = -1
elif cerberus_type != "multi":
raise Exception("Type '{}' not implemented yet.".format(cerberus_type))
if var.secret:
if "default" not in option_properties.keys():
if option_properties["required"]:
option_properties["default"] = "ask"
option_properties["show_default"] = True
if param_type == "option":
option_properties["help"] = var.doc.get_short_help()
p = click.Option(**option_properties)
else:
option_properties.pop("show_default", None)
if (
"nargs" in option_properties.keys()
and "default" in option_properties.keys()
):
log.warning(
"Removing 'nargs' property from argument '{}' ('nargs' & 'default' are not allowed together)".format(
var_name
)
)
option_properties.pop("nargs")
p = click.Argument(**option_properties)
return p
[docs]class VariablesFilterAttribute(TingAttribute):
def __init__(
self,
source_attr_name="vars_frecklet",
target_attr_name="vars_required",
required=True,
):
self.source_attr_name = source_attr_name
self.target_attr_name = target_attr_name
self.required = required
[docs] def requires(self):
return [self.source_attr_name]
[docs] def provides(self):
return [self.target_attr_name]
[docs] def get_attribute(self, ting, attribute_name=None):
vars = getattr(ting, self.source_attr_name)
result = OrderedDict()
for var_name, arg_obj in vars.items():
req = arg_obj.required
if req and self.required:
result[var_name] = arg_obj
elif not req and not self.required:
result[var_name] = arg_obj
return result
[docs]class VariablesAttribute(TingAttribute):
def __init__(
self, target_attr_name="vars_frecklet", default_argument_description=None
):
self.target_attr_name = target_attr_name
self.default_argument_description = default_argument_description
[docs] def provides(self):
return [self.target_attr_name, "{}_tree".format(self.target_attr_name)]
[docs] def requires(self):
return ["task_tree"]
[docs] def get_attribute(self, ting, attribute_name=None):
task_tree = ting.task_tree
paths_to_leaves = task_tree.paths_to_leaves()
vars_tree = {}
for path in paths_to_leaves:
leaf_node_id = path[-1]
args = self.get_vars_from_path(path_to_leaf=path, tree=task_tree, ting=ting)
vars_tree[leaf_node_id] = args
vars = self.consolidate_vars(vars_tree, ting)
result = {
"{}_tree".format(self.target_attr_name): vars_tree,
self.target_attr_name: vars,
}
return MultiCacheResult(**result)
[docs] def consolidate_vars(self, vars_tree, ting):
result = {}
for args in vars_tree.values():
for arg_name, arg in args.items():
if (
arg_name in result.keys()
and not result[arg_name].is_auto_arg
and not arg.is_auto_arg
and result[arg_name].schema != arg.schema
):
# import pp
# pp(arg.__dict__)
# pp(result[arg_name].__dict__)
raise FreckletBuildException(
frecklet=ting,
msg="Duplicate arg '{}'.".format(arg_name),
solution="Check format of frecklet '{}'.".format(ting.id),
references={
"frecklet documentation": "https://freckles.io/doc/frecklets/anatomy"
},
)
if arg_name not in result.keys() or not arg.is_auto_arg:
result[arg_name] = arg
ordered = OrderedDict()
for k in sorted(result.keys()):
ordered[k] = result[k]
return ordered
[docs] def resolve_vars(self, current_args, rest_path, last_node, tree, ting):
current_node_id = next(rest_path)
# print("CURRENT: {}".format(current_node_id))
current_node = tree.get_node(current_node_id)
if current_node_id == 0:
vars = {}
for key in current_args.keys():
vars[key] = "{{{{:: {} ::}}}}".format(key)
else:
vars = current_node.data["task"]["vars"]
if current_node_id == 0:
available_args = current_node.data.args
else:
available_args = current_node.data["task"]["args"]
args = {}
for key, arg in current_args.items():
if key not in vars.keys():
if arg.required and arg.default is None:
# relevant frecklet root name: tree.get_node(0).tag,
raise FreckletBuildException(
frecklet=ting,
msg="Invalid frecklet-task '{}' in '{}': does not provide required/non-defaulted var '{}' for child frecklet '{}'.".format(
current_node.tag, tree.get_node(0).tag, key, last_node.tag
),
solution="Check format of frecklet '{}'.".format(
current_node.tag
),
references={
"frecklet documentation": "https://freckles.io/doc/frecklets/anatomy"
},
)
# means we don't have to worry about this key, as it is not used and not required
continue
else:
parent_var = vars[key]
# print("PARENT VAR: {}".format(parent_var))
if parent_var == "{{{{:: {} ::}}}}".format(key):
# this means the var is just being carried forward, from the point of view of the parent frecklet task
arg_config = available_args.get(key, None)
if arg_config is None:
# print("USING CHILD ARG")
new_arg = arg
else:
new_arg = Arg(
key, arg_config, default_schema=FRECKLES_DEFAULT_ARG_SCHEMA
)
new_arg.add_child(arg)
new_arg.var_template = parent_var
args[key] = new_arg
else:
template_keys = get_template_keys(
parent_var, jinja_env=DEFAULT_FRECKLES_JINJA_ENV
)
for tk in template_keys:
arg_config = available_args.get(tk, None)
if arg_config is None:
if tk == key:
new_arg = arg
else:
new_arg = Arg(
tk,
{},
default_schema=FRECKLES_DEFAULT_ARG_SCHEMA,
is_auto_arg=True,
)
new_arg.add_child(arg)
else:
new_arg = Arg(
tk,
arg_config,
default_schema=FRECKLES_DEFAULT_ARG_SCHEMA,
)
new_arg.add_child(arg)
new_arg.var_template = parent_var
args[tk] = new_arg
if current_node_id != 0:
r = self.resolve_vars(
current_args=args,
rest_path=rest_path,
last_node=current_node,
tree=tree,
ting=ting,
)
return r
else:
root_task = last_node.data["task"]
try:
tks = get_template_keys(root_task, jinja_env=DEFAULT_FRECKLES_JINJA_ENV)
except (TemplateSyntaxError) as e:
raise FreckletException(ting, e, ting.id)
root_tks = {}
for tk in tks:
if tk in args.keys():
continue
# add a template key that is under either 'frecklet' or 'task' of the root task of a frecklet
arg = available_args.get(tk, None)
auto_arg = False
if arg is None:
if self.default_argument_description is None:
f_name = current_node.data.id
raise Exception(
"No argument description for argument '{}' in frecklet '{}'".format(
tk, f_name
)
)
else:
arg = self.default_argument_description
auto_arg = True
arg = Arg(
tk,
arg,
default_schema=FRECKLES_DEFAULT_ARG_SCHEMA,
is_auto_arg=auto_arg,
)
root_tks[tk] = arg
dict_merge(args, root_tks, copy_dct=False)
return args
[docs] def get_vars_from_path(self, path_to_leaf, tree, ting):
root = path_to_leaf[-1]
root_node = tree.get_node(root)
# end = path_to_leave[0]
available_args = root_node.data["root_frecklet"].args
template_keys = root_node.data["root_frecklet"].template_keys
args = Arg.from_keys(
template_keys, available_args, default_schema=FRECKLES_DEFAULT_ARG_SCHEMA
)
rest_path = reversed(path_to_leaf[0:-1])
return self.resolve_vars(
current_args=args,
rest_path=rest_path,
last_node=root_node,
tree=tree,
ting=ting,
)