Source code for freckles.output_callback
# -*- coding: utf-8 -*-
import logging
from collections import OrderedDict
import colorama
from frutils import readable_yaml, dict_merge
colorama.init()
log = logging.getLogger("freckles")
ALLOWED_STATUS = ["ok", "skipped", "changed", "failed"]
# extensions
# ------------------------------------------------------------------------
[docs]class FrecklesRun(object):
def __init__(
self,
run_id,
adapter_name,
task_list,
run_vars,
run_config,
run_env,
run_properties,
):
self.run_id = run_id
self.adapter_name = adapter_name
self.task_list = task_list
self.run_vars = run_vars
self.run_config = run_config
self.run_env = run_env
self.run_properties = run_properties
def __str__(self):
return readable_yaml(
{
"name": self.frecklet_name,
"run_properties": self.run_properties,
"task_list": self.task_list,
}
)
RESULT_STRATEGIES = ["merge", "update", "append", "ordered_merge", "ordered_update"]
[docs]class FrecklesResultCallback(object):
""" Class to gather results of a frecklecute run.
Args:
add_strategy:
"""
def __init__(self, result_strategy="merge"):
if result_strategy not in RESULT_STRATEGIES:
raise Exception(
"result_stragegy '{}' not supported.".format(result_strategy)
)
self.result_strategy = result_strategy
if result_strategy == "append":
self.result = []
else:
if "ordered" in result_strategy:
self.result = OrderedDict()
else:
self.result = {}
[docs] def add_result(self, overlay_dict):
if "merge" in self.result_strategy:
dict_merge(self.result, overlay_dict, copy_dct=False)
elif "update" in self.result_strategy:
self.result.update(overlay_dict)
else:
self.result.append(overlay_dict)
[docs] def finish_up(self):
pass