Source code for freckles.context.config

# -*- coding: utf-8 -*-

import io
import json
import os
from collections import Mapping, Iterable

from ruamel.yaml import YAML

from freckles.defaults import FRECKLES_CONFIG_DIR, FRECKLES_CONFIG_UNLOCK_FILES
from freckles.exceptions import FrecklesUnlockException
from freckles.frecklet.arguments import *  # noqa
from freckles.schemas import FRECKLES_CONTEXT_SCHEMA, PROFILE_LOAD_CONFIG_SCHEMA
from frutils.config.cnf import Cnf

from frutils.exceptions import FrklException
from ting.ting_attributes import (
    FrontmatterAndContentAttribute,
    DictContentAttribute,
    FileStringContentAttribute,
    ValueAttribute,
)
from ting.ting_cast import TingCast
from ting.tings import TingTings

log = logging.getLogger("freckles")

yaml = YAML()

UNLOCK_STRING = "I know what I'm doing and this is not just copy and pasted from a random blog post on the internet. Also, I accept the freckles license."


[docs]def is_unlocked(): unlocked = False for f in FRECKLES_CONFIG_UNLOCK_FILES: if not os.path.isfile(f): continue with io.open(f, "r", encoding="utf-8") as uf: content = uf.read() if UNLOCK_STRING.lower() in content.lower(): unlocked = True break return unlocked
[docs]def unlock(): f = FRECKLES_CONFIG_UNLOCK_FILES[0] with io.open(f, "w", encoding="utf-8") as uf: uf.write(UNLOCK_STRING)
[docs]def lock(): for f in FRECKLES_CONFIG_UNLOCK_FILES: if os.path.isfile(f): os.unlink(f)
DEFAULT_CONFIG_DICTS = { "default": {"parents": [{"repos": ["default", "user", "./.freckles"]}]}, "community": { "parents": [ "default", {"repos": ["default", "community", "user", "./.freckles"]}, ] }, "latest": { "parents": [ "default", { "repos": [ "frecklet::gl:frecklets/frecklets-nsbl-default::develop::", "tempting::gl:frecklets/temptings-default::develop::", "ansible-role::gl:frecklets/frecklets-nsbl-default-resources::develop::", "ansible-tasklist::gl:frecklets/frecklets-nsbl-default-resources::develop::", "user", "./.freckles", ] }, ] }, "latest-community": { "parents": [ "default", { "repos": [ "frecklet::gl:frecklets/frecklets-nsbl-default::develop::", "tempting::gl:frecklets/temptings-default::develop::", "ansible-role::gl:frecklets/frecklets-nsbl-default-resources::develop::", "ansible-tasklist::gl:frecklets/frecklets-nsbl-default-resources::develop::", "frecklet::gl:frecklets/frecklets-nsbl-community::develop::", "ansible-role::gl:frecklets/frecklets-nsbl-community-resources::develop::", "ansible-tasklist::gl:frecklets/frecklets-nsbl-community::develop::", "user", "./.freckles", ] }, ] }, "shell": {"parents": ["default", {"adapters": ["shell", "freckles"]}]}, "debug": { "parents": [ "default", { "keep_run_folder": True, "force_show_log": True, "create_current_symlink": True, "callback": "default::full", }, ] }, "empty": {"parents": [{"repos": []}]}, }
[docs]class ContextConfigTingCast(TingCast): """A :class:`TingCast` to create freckles profiles by reading yaml files.""" CNF_PROFILE_ATTRIBUTES = [ FileStringContentAttribute(target_attr_name="ting_content"), FrontmatterAndContentAttribute( content_name="content", source_attr_name="ting_content" ), ValueAttribute("config_dict", source_attr_name="content"), # CnfTingAttribute(), DictContentAttribute( source_attr_name="content", dict_name="config_dict", default={}, copy_default=True, ), ] def __init__(self): super(ContextConfigTingCast, self).__init__( class_name="ContextConfigProfile", ting_attributes=ContextConfigTingCast.CNF_PROFILE_ATTRIBUTES, ting_id_attr="filename_no_ext", )
[docs]class ContextConfigs(TingTings): """A class to manage freckles profiles. This reads all '*.profile' files in the freckles config folder. Those are later used to create a freckles context (per profile). It also checks whether there exists a 'default.profile' file with the 'accept_freckles_license' value set to 'true'. Only if that is the case will it allow custom profiles (mainly for security reasons - the user should explicitely accept that certain configurations can be insecure). Args: repos (str, list): a list of local folders containing '*.context' files Returns: TingTings: an index of config files """ DEFAULT_TING_CAST = ContextConfigTingCast
[docs] @classmethod def load_user_context_configs(cls, repos=None): if repos is None: repos = FRECKLES_CONFIG_DIR cnf = Cnf({}) # this is the 'root' configuration for the actual config objectx cnf.add_interpreter("root_config", FRECKLES_CONTEXT_SCHEMA) profile_load_config = cnf.add_interpreter( "profile_load", PROFILE_LOAD_CONFIG_SCHEMA ) configs = ContextConfigs.from_folders( "user_contexts", repos, load_config=profile_load_config, cnf=cnf ) return configs
def __init__(self, repo_name, tingsets, cnf, default_config_dicts=None, **kwargs): if default_config_dicts is None: default_config_dicts = DEFAULT_CONFIG_DICTS self.default_config_dicts = copy.deepcopy(default_config_dicts) invalid = [] for tings in tingsets: for ting in tings.values(): if ting.id in self.default_config_dicts.keys(): invalid.append(ting.id) if invalid: raise FrklException( msg="Invalid context name(s) for '{}': {}".format( repo_name, ", ".join(invalid) ), reason="Context config files named after reserved context names in repo: {}".format( ", ".join(repo_name) ), solution="Rename affected context configs.", ) if cnf is None: raise Exception("Base configuration object can't be None.") if "profile_load" not in cnf.get_interpreter_names(): raise Exception("No 'profile_load' cnf interpreter available.") load_config = cnf.get_interpreter("profile_load") if "root_config" not in cnf.get_interpreter_names(): raise Exception("No root_config profile interpreter in cnf.") self._cnf = cnf self._root_config = cnf.get_interpreter("root_config").config super(ContextConfigs, self).__init__( repo_name=repo_name, tingsets=tingsets, load_config=load_config, indexes=["filename_no_ext"], )
[docs] def get_config_type(self, config_obj): if isinstance(config_obj, string_types): config_obj = config_obj.strip() if config_obj.startswith("{") and config_obj.endswith("}"): return "json" elif "=" in config_obj: return "key_value" elif config_obj in self.default_config_dicts.keys(): return "default_config_dict" elif config_obj in self.keys(): return "user_config_dict" else: all_context_names = list(self.default_config_dicts.keys()) + list( self.keys() ) raise FrklException( msg="Can't determine config type for string '{}'".format( config_obj ), solution="Value needs to be either json string, key/value pair (separated with '='), or the name of a default or user context config: {}".format( ", ".join(all_context_names) ), ) elif isinstance(config_obj, Mapping): return "dict" else: raise FrklException( msg="Invalid config type '{}' for object: {}".format( type(config_obj), config_obj ), solution="Needs to be either string, or dict.", )
[docs] def resolve_context_config_dict(self, config_chain, current_config_dict=None): if current_config_dict is None: current_config_dict = {} for config in config_chain: config_type = self.get_config_type(config) if config_type == "dict": config_dict = copy.deepcopy(config) elif config_type == "json": try: config_dict = json.loads(config) except (Exception): raise FrklException( msg="Can't parse json config object: {}".format(config), solution="Check json format.", ) elif config_type == "key_value": key, value = config.split("=", 1) if value == "": raise FrklException( msg="No value provided for key/value config object: {}".format( config ) ) if value.lower() in ["true", "yes"]: value = True elif value.lower() in ["false", "no"]: value = False else: try: value = int(value) except (Exception): # fine, we'll just use the string # TODO: support lists pass config_dict = {key: value} elif config_type == "default_config_dict": config_dict = copy.deepcopy(self.default_config_dicts[config]) elif config_type == "user_config_dict": config_dict = copy.deepcopy(self.get(config).config_dict) else: raise FrklException(msg="Invalid config type: {}".format(config_type)) config_parents = config_dict.pop("parents", []) config_extra_repos = config_dict.pop("extra_repos", []) if isinstance(config_extra_repos, string_types) or not isinstance( config_extra_repos, Sequence ): config_extra_repos = [config_extra_repos] self.resolve_context_config_dict( config_chain=config_parents, current_config_dict=current_config_dict ) dict_merge(current_config_dict, config_dict, copy_dct=False) if config_extra_repos: current_config_dict.setdefault("repos", []).extend(config_extra_repos) return current_config_dict
[docs] def create_context_config(self, context_name, config_chain=None, extra_repos=None): """Creates a new context configuration out of the provided config_chain and extra repos. If the config_chain argument is empty, the 'default' profile will be used. """ if config_chain is None: config_chain = [] elif isinstance(config_chain, tuple): config_chain = list(config_chain) if isinstance(config_chain, (string_types, Mapping)): config_chain = [config_chain] elif not isinstance(config_chain, Iterable): config_chain = [config_chain] if not config_chain or config_chain[0] != "default": config_chain.insert(0, "default") final_config = self.resolve_context_config_dict(config_chain=config_chain) cc = ContextConfig( alias=context_name, config_dict=final_config, extra_repos=extra_repos ) return cc
[docs]class ContextConfig(object): def __init__(self, alias, config_dict, extra_repos=None, config_unlocked=None): self._default_config = Cnf({}) self._root_config = Cnf(config_dict) if isinstance(config_unlocked, bool): self._config_unlocked = config_unlocked else: self._config_unlocked = is_unlocked() self._alias = alias if not extra_repos: extra_repos = [] if isinstance(extra_repos, string_types) or not isinstance( extra_repos, Sequence ): extra_repos = [extra_repos] self._extra_repos = extra_repos config_dict.setdefault("repos", []) for r in self._extra_repos: if r not in config_dict["repos"]: config_dict["repos"].append(r) self._config_dict = config_dict self._cnf = Cnf(self._config_dict) self.add_cnf_interpreter("context", FRECKLES_CONTEXT_SCHEMA) @property def cnf(self): return self._cnf
[docs] def add_cnf_interpreter(self, interpreter_name, schema): # we need that to compare with 'no config' option self._root_config.add_interpreter(interpreter_name, schema) self._default_config.add_interpreter(interpreter_name, schema) self._cnf.add_interpreter(interpreter_name, schema)
[docs] def config(self, interpreter_name, *overlays): if interpreter_name is None: interpreter_name = "context" interpreter = self._cnf.get_interpreter(interpreter_name) config = interpreter.overlay(*overlays) if self._config_unlocked: return config not_allowed = [] for k, v in config.items(): tags = interpreter.get_tags(k) value = interpreter.get(k) if "safe" not in tags: orig_value = self._root_config.get_interpreter_value( interpreter_name, k, None ) if value != orig_value: not_allowed.append(k) if not_allowed: raise FrecklesUnlockException( "Access prevented to configuration key(s): {}.".format( ", ".join(not_allowed) ) ) return config
[docs] def config_value(self, key, interpreter_name=None): if interpreter_name is None: interpreter_name = "context" # print(interpreter_name) interpreter = self._cnf.get_interpreter(interpreter_name) if self._config_unlocked: return interpreter.get(key) # check whether this config value is 'safe' try: tags = interpreter.get_tags(key) except (Exception): tags = [] value = interpreter.get(key) if "safe" not in tags: orig_value = self._default_config.get_interpreter_value( interpreter_name, key, None ) if value != orig_value: val = str(value) if len(val) < 12: val = " with (non-default) value '{}'".format(val) raise FrecklesUnlockException( "Setting of configuration key '{}'{} not allowed.".format(key, val) ) return value