# -*- coding: utf-8 -*-
import io
import json
import os
from collections import Mapping, Iterable
from ruamel.yaml import YAML
from six import string_types
from freckles.defaults import ACCEPT_FRECKLES_LICENSE_KEYNAME, FRECKLES_CONFIG_DIR
from freckles.exceptions import FrecklesConfigException, FrecklesUnlockException
from freckles.frecklet.arguments import * # noqa
from freckles.schemas import FRECKLES_CONTEXT_SCHEMA, PROFILE_LOAD_CONFIG_SCHEMA
from frutils.config.cnf import Cnf
# from .output_callback import DefaultCallback
from ting.ting_attributes import (
FrontmatterAndContentAttribute,
DictContentAttribute,
FileStringContentAttribute,
ValueAttribute,
)
from ting.ting_cast import TingCast
from ting.tings import TingTings
log = logging.getLogger("freckles")
yaml = YAML()
[docs]class ContextConfigTingCast(TingCast):
"""A :class:`TingCast` to create freckles profiles by reading yaml files."""
CNF_PROFILE_ATTRIBUTES = [
FileStringContentAttribute(target_attr_name="ting_content"),
FrontmatterAndContentAttribute(
content_name="content", source_attr_name="ting_content"
),
ValueAttribute("config_dict", source_attr_name="content"),
# CnfTingAttribute(),
DictContentAttribute(
source_attr_name="content",
dict_name="config_dict",
default={},
copy_default=True,
),
]
def __init__(self):
super(ContextConfigTingCast, self).__init__(
class_name="ContextConfigProfile",
ting_attributes=ContextConfigTingCast.CNF_PROFILE_ATTRIBUTES,
ting_id_attr="filename_no_ext",
)
[docs]class ContextConfigs(TingTings):
"""A class to manage freckles profiles.
This reads all '*.profile' files in the freckles config folder. Those are later used to create a freckles context
(per profile). It also checks whether there exists a 'default.profile' file with the 'accept_freckles_license' value
set to 'true'. Only if that is the case will it allow custom profiles (mainly for security reasons - the user should
explicitely accept that certain configurations can be insecure).
"""
DEFAULT_TING_CAST = ContextConfigTingCast
[docs] @classmethod
def load_configs(cls):
cnf = Cnf({})
cnf.add_interpreter("root_config", FRECKLES_CONTEXT_SCHEMA)
profile_load_config = cnf.add_interpreter(
"profile_load", PROFILE_LOAD_CONFIG_SCHEMA
)
configs = ContextConfigs.from_folders(
"cnf_profiles",
FRECKLES_CONFIG_DIR,
load_config=profile_load_config,
cnf=cnf,
)
return configs
def __init__(self, repo_name, tingsets, cnf, **kwargs):
# check whether freckles license is accepted
self._default_config_path = os.path.join(FRECKLES_CONFIG_DIR, "default.context")
try:
with io.open(self._default_config_path, "r", encoding="utf-8") as f:
default_context_dict = yaml.load(f)
self._config_unlocked = (
default_context_dict.get(ACCEPT_FRECKLES_LICENSE_KEYNAME, False)
is True
)
except (Exception):
self._config_unlocked = False
if cnf is None:
raise Exception("Base configuration object can't be None.")
if "profile_load" not in cnf.get_interpreter_names():
raise Exception("No 'profile_load' cnf interpreter available.")
load_config = cnf.get_interpreter("profile_load")
if "root_config" not in cnf.get_interpreter_names():
raise Exception("No root_config profile interpreter in cnf.")
self._cnf = cnf
self._root_config = cnf.get_interpreter("root_config").config
self._default_profile_values = None
super(ContextConfigs, self).__init__(
repo_name=repo_name,
tingsets=tingsets,
load_config=load_config,
indexes=["filename_no_ext"],
)
[docs] def create_context_config(
self, context_name, config_chain=None, extra_repos=None, use_community=False
):
if config_chain is None:
config_chain = []
elif isinstance(config_chain, tuple):
config_chain = list(config_chain)
if isinstance(config_chain, (string_types, Mapping)):
config_chain = [config_chain]
elif not isinstance(config_chain, Iterable):
config_chain = [config_chain]
dict_chain = [self._root_config]
if os.path.exists(self._default_config_path):
# always use default config
config_chain.insert(0, "default")
for index, config in enumerate(config_chain):
# check if string
if isinstance(config, string_types):
config = config.strip()
if config in self.keys():
if not self._config_unlocked and config != "default":
raise FrecklesUnlockException(
"Access to context '{}' not allowed.".format(config)
)
# means we want the config from the file
config = self.get(config).config_dict
elif not config.startswith("{") and "=" in config:
key, value = config.split("=", 1)
if value.lower() in ["true", "yes"]:
value = True
elif value.lower() in ["false", "no"]:
value = False
# elif "::" in value:
# value = value.split("::")
else:
try:
value = int(value)
except (Exception):
# fine, we'll just use the string
# TODO: support lists
pass
config = {key: value}
elif config.startswith("{"):
# trying to read json
try:
config = json.loads(config)
except (Exception):
raise Exception(
"Can't assemble profile configuration, don't know how to handle: {}".format(
config
)
)
else:
if "=" not in config:
raise FrecklesConfigException(
msg="Could not assemble context configuration.",
reason="No configuration file '{}.context' in config dir.".format(
config
),
solution="Create context config '{}.context' in '{}' or use correct configuration syntax if that is not what you intended.".format(
config, FRECKLES_CONFIG_DIR
),
references={
"freckles configuration documentation": "https://freckles.io/doc/configuration"
},
)
raise Exception(
"Can't create profile configuration, invalid config: {}.".format(
config
)
)
if isinstance(config, Mapping):
dict_chain.append(config)
else:
raise Exception(
"Can't assemble profile configuration, unknown type '{}' for value '{}'".format(
type(config), config
)
)
cc = ContextConfig(
alias=context_name,
config_chain=dict_chain,
extra_repos=extra_repos,
use_community=use_community,
config_unlocked=self._config_unlocked,
)
return cc
[docs]class ContextConfig(object):
def __init__(
self,
alias,
config_chain=None,
extra_repos=None,
use_community=False,
config_unlocked=False,
):
if config_chain is None:
config_chain = []
if len(config_chain) > 0:
self._root_config = Cnf(config_chain[0])
else:
self._root_config = Cnf({})
self._config_unlocked = config_unlocked
self._alias = alias
self._config_chain = config_chain
self._extra_repos = extra_repos
self._use_community = use_community
merged = {}
for d in config_chain:
dict_merge(merged, d, copy_dct=False)
repos_to_add = []
if use_community:
repos_to_add.append("community")
if extra_repos:
if isinstance(extra_repos, string_types):
repos_to_add.append(extra_repos)
else:
repos_to_add.extend(list(extra_repos))
merged.setdefault("repos", [])
for repo in repos_to_add:
merged["repos"].append(repo)
self._config_dict = merged
self._cnf = Cnf(self._config_dict)
self.add_cnf_interpreter("context", FRECKLES_CONTEXT_SCHEMA)
@property
def cnf(self):
return self._cnf
@property
def config_unlocked(self):
return self._config_unlocked
[docs] def add_cnf_interpreter(self, interpreter_name, schema):
# we need that to compare with 'no config' option
self._root_config.add_interpreter(interpreter_name, schema)
self._cnf.add_interpreter(interpreter_name, schema)
[docs] def config(self, interpreter_name, *overlays):
if interpreter_name is None:
interpreter_name = "context"
interpreter = self._cnf.get_interpreter(interpreter_name)
config = interpreter.overlay(*overlays)
if self._config_unlocked:
return config
not_allowed = []
for k, v in config.items():
tags = interpreter.get_tags(k)
value = interpreter.get(k)
if "safe" not in tags:
orig_value = self._root_config.get_interpreter_value(
interpreter_name, k, None
)
if value != orig_value:
not_allowed.append(k)
if not_allowed:
raise FrecklesUnlockException(
"Access prevented to configuration key(s): {}.".format(
", ".join(not_allowed)
)
)
return config
[docs] def config_value(self, key, interpreter_name=None):
if interpreter_name is None:
interpreter_name = "context"
# print(interpreter_name)
interpreter = self._cnf.get_interpreter(interpreter_name)
if self._config_unlocked:
return interpreter.get(key)
# check whether this config value is 'safe'
try:
tags = interpreter.get_tags(key)
except (Exception):
tags = []
value = interpreter.get(key)
if "safe" not in tags:
orig_value = self._root_config.get_interpreter_value(
interpreter_name, key, None
)
if value != orig_value:
val = str(value)
if len(val) < 12:
val = " with (non-default) value '{}'".format(val)
raise FrecklesUnlockException(
"Setting of configuration key '{}'{} not allowed.".format(key, val)
)
return value